Module: Legion::Transport::Kafka::Consumer

Extended by:
Logging::Helper
Defined in:
lib/legion/transport/kafka/consumer.rb

Overview

Consumer wraps rdkafka consumer handles for subscribe and replay operations. Each subscribe/replay call creates its own isolated consumer handle so that multiple topics and consumer groups can coexist in a single process.

Class Method Summary collapse

Class Method Details

.replay(topic, replay_group:, from_beginning: true, from_offset: nil, from_timestamp: nil) {|IncomingMessage| ... } ⇒ Object

Replay a topic from a specific point without disturbing production offsets. A temporary consumer group is used so that production group offsets are unaffected. The consumer reads until caught up to the high-water mark at the time replay started, then exits.

Parameters:

  • topic (String)
  • from_beginning (Boolean) (defaults to: true)
  • from_offset (Integer, nil) (defaults to: nil)

    explicit partition-0 start offset

  • from_timestamp (Time, nil) (defaults to: nil)

    seek to nearest offset for this timestamp

  • replay_group (String)

    isolated group ID for this replay session

Yields:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/legion/transport/kafka/consumer.rb', line 68

def replay(topic, replay_group:, from_beginning: true, from_offset: nil, from_timestamp: nil)
  cfg      = consumer_config(group: replay_group, from_beginning: from_beginning)
  consumer = ::Rdkafka::Config.new(cfg).consumer

  begin
    consumer.subscribe(topic)

    # Seek if an explicit offset or timestamp was provided.
    if from_offset || from_timestamp
      seek_consumer(consumer, topic, from_offset:    from_offset,
                                     from_timestamp: from_timestamp)
    end

    log_replay(topic, replay_group, from_beginning: from_beginning,
                                    from_offset:    from_offset,
                                    from_timestamp: from_timestamp)

    loop do
      message = consumer.poll(poll_timeout_ms)
      break if message.nil?

      yield IncomingMessage.new(message)
    end
  ensure
    consumer.close
  end
rescue StandardError => e
  handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.consumer.replay',
                   topic: topic, replay_group: replay_group)
  raise Legion::Transport::Kafka::ConsumerError, "Kafka replay error on #{topic}: #{e.message}"
end

.reset!Object

No persistent state to reset; included for symmetry with Producer.reset!



101
102
103
# File 'lib/legion/transport/kafka/consumer.rb', line 101

def reset!
  true
end

.subscribe(topics, group:, from_beginning: false, max_messages: nil) {|IncomingMessage| ... } ⇒ Object

Subscribe to one or more topics and yield each message to the block. Runs synchronously in the calling thread — wrap in a Thread or actor for background processing.

Parameters:

  • topics (String, Array<String>)
  • group (String)
  • from_beginning (Boolean) (defaults to: false)

    true = earliest, false = latest

  • max_messages (Integer, nil) (defaults to: nil)

    stop after N messages; nil = run until stopped

Yields:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/legion/transport/kafka/consumer.rb', line 26

def subscribe(topics, group:, from_beginning: false, max_messages: nil)
  topic_list = Array(topics)
  cfg        = consumer_config(group: group, from_beginning: from_beginning)
  consumer   = ::Rdkafka::Config.new(cfg).consumer

  begin
    consumer.subscribe(*topic_list)
    log_subscribe(topic_list, group)

    count = 0
    loop do
      message = consumer.poll(poll_timeout_ms)
      next if message.nil?

      yield IncomingMessage.new(message)

      count += 1
      commit_if_needed(consumer, count)
      break if max_messages && count >= max_messages
    end
  rescue StopIteration
    # clean exit from consumer loop
  ensure
    consumer.close
  end
rescue StandardError => e
  handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.consumer.subscribe',
                   group: group, topics: topic_list.join(','))
  raise Legion::Transport::Kafka::ConsumerError, "Kafka consumer error: #{e.message}"
end