Class: SharedBroker::Adapters::Kafka
- Defined in:
- lib/shared_broker/adapters/kafka.rb
Instance Method Summary collapse
-
#initialize(seed_brokers:, client_id: "shared_broker") ⇒ Kafka
constructor
A new instance of Kafka.
- #publish(topic, message, correlation_id: nil) ⇒ Object
- #subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
Constructor Details
#initialize(seed_brokers:, client_id: "shared_broker") ⇒ Kafka
Returns a new instance of Kafka.
10 11 12 13 14 15 16 17 |
# File 'lib/shared_broker/adapters/kafka.rb', line 10 def initialize(seed_brokers:, client_id: "shared_broker") begin require "kafka" rescue LoadError raise unless defined?(::Kafka) end @kafka = ::Kafka.new(seed_brokers, client_id: client_id) end |
Instance Method Details
#publish(topic, message, correlation_id: nil) ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/shared_broker/adapters/kafka.rb', line 19 def publish(topic, , correlation_id: nil) unless .is_a?(Hash) raise ArgumentError, "Expected message to be a Hash, got #{.class} with value #{.inspect}" end headers = {} headers["correlation_id"] = correlation_id if correlation_id @kafka.(.to_json, topic: topic, headers: headers) end |
#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/shared_broker/adapters/kafka.rb', line 30 def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) consumer = @kafka.consumer(group_id: queue_name) consumer.subscribe(topic) Thread.new do consumer. do || data = JSON.parse(.value, symbolize_names: true) if .headers && .headers["correlation_id"] data[:_correlation_id] = .headers["correlation_id"] end attempts = 0 begin block.call(data) rescue => e attempts += 1 if attempts <= max_retries sleep(backoff_base**attempts) retry else publish_to_dlq(topic, queue_name, .value, .headers, e) end end end end end |