Class: SharedBroker::Adapters::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/shared_broker/adapters/kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(seed_brokers:, client_id: "shared_broker") ⇒ Kafka

Returns a new instance of Kafka.



10
11
12
13
14
15
16
17
# File 'lib/shared_broker/adapters/kafka.rb', line 10

def initialize(seed_brokers:, client_id: "shared_broker")
  begin
    require "kafka"
  rescue LoadError
    raise unless defined?(::Kafka)
  end
  @kafka = ::Kafka.new(seed_brokers, client_id: client_id)
end

Instance Method Details

#publish(topic, message, correlation_id: nil) ⇒ Object



19
20
21
22
23
24
25
26
27
28
# File 'lib/shared_broker/adapters/kafka.rb', line 19

def publish(topic, message, correlation_id: nil)
  unless message.is_a?(Hash)
    raise ArgumentError, "Expected message to be a Hash, got #{message.class} with value #{message.inspect}"
  end

  headers = {}
  headers["correlation_id"] = correlation_id if correlation_id
  
  @kafka.deliver_message(message.to_json, topic: topic, headers: headers)
end

#subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/shared_broker/adapters/kafka.rb', line 30

def subscribe(topic, queue_name, max_retries: 3, backoff_base: 2, &block)
  consumer = @kafka.consumer(group_id: queue_name)
  consumer.subscribe(topic)
  
  Thread.new do
    consumer.each_message do |message|
      data = JSON.parse(message.value, symbolize_names: true)
      if message.headers && message.headers["correlation_id"]
        data[:_correlation_id] = message.headers["correlation_id"]
      end
      
      attempts = 0
      begin
        block.call(data)
      rescue => e
        attempts += 1
        if attempts <= max_retries
          sleep(backoff_base**attempts)
          retry
        else
          publish_to_dlq(topic, queue_name, message.value, message.headers, e)
        end
      end
    end
  end
end