Class: SharedBroker::Adapters::Kafka
- Inherits:
-
Base
- Object
- Base
- SharedBroker::Adapters::Kafka
show all
- Defined in:
- lib/shared_broker/adapters/kafka.rb
Instance Method Summary
collapse
Methods inherited from Base
#publish_batch, #redrive_dlq
Constructor Details
#initialize(seed_brokers:, client_id: "shared_broker") ⇒ Kafka
Returns a new instance of Kafka.
10
11
12
13
14
15
16
17
|
# File 'lib/shared_broker/adapters/kafka.rb', line 10
def initialize(seed_brokers:, client_id: "shared_broker")
begin
require "kafka"
rescue LoadError
raise unless defined?(::Kafka)
end
@kafka = ::Kafka.new(seed_brokers, client_id: client_id)
end
|
Instance Method Details
#publish(topic, message, correlation_id: nil) ⇒ Object
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/shared_broker/adapters/kafka.rb', line 19
def publish(topic, message, correlation_id: nil)
unless message.is_a?(Hash)
raise ArgumentError, "Expected message to be a Hash, got #{message.class} with value #{message.inspect}"
end
= {}
["correlation_id"] = correlation_id if correlation_id
@kafka.deliver_message(message.to_json, topic: topic, headers: )
end
|
#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/shared_broker/adapters/kafka.rb', line 30
def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
consumer = @kafka.consumer(group_id: queue_name)
consumer.subscribe(topic)
Thread.new do
consumer.each_message do |message|
data = JSON.parse(message.value, symbolize_names: true)
if message. && message.["correlation_id"]
data[:_correlation_id] = message.["correlation_id"]
end
attempts = 0
begin
block.call(data)
rescue SharedBroker::ShutdownError
consumer.stop
rescue => e
attempts += 1
if attempts <= max_retries
sleep(backoff_base**attempts)
retry
else
publish_to_dlq(topic, queue_name, message.value, message., e)
end
end
end
end
end
|