Module: Legion::Transport::Kafka::Consumer
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/transport/kafka/consumer.rb
Overview
Consumer wraps rdkafka consumer handles for subscribe and replay operations. Each subscribe/replay call creates its own isolated consumer handle so that multiple topics and consumer groups can coexist in a single process.
Class Method Summary collapse
-
.replay(topic, replay_group:, from_beginning: true, from_offset: nil, from_timestamp: nil) {|IncomingMessage| ... } ⇒ Object
Replay a topic from a specific point without disturbing production offsets.
-
.reset! ⇒ Object
No persistent state to reset; included for symmetry with Producer.reset!.
-
.subscribe(topics, group:, from_beginning: false, max_messages: nil) {|IncomingMessage| ... } ⇒ Object
Subscribe to one or more topics and yield each message to the block.
Class Method Details
.replay(topic, replay_group:, from_beginning: true, from_offset: nil, from_timestamp: nil) {|IncomingMessage| ... } ⇒ Object
Replay a topic from a specific point without disturbing production offsets. A temporary consumer group is used so that production group offsets are unaffected. The consumer reads until caught up to the high-water mark at the time replay started, then exits.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/legion/transport/kafka/consumer.rb', line 68 def replay(topic, replay_group:, from_beginning: true, from_offset: nil, from_timestamp: nil) cfg = consumer_config(group: replay_group, from_beginning: from_beginning) consumer = ::Rdkafka::Config.new(cfg).consumer begin consumer.subscribe(topic) # Seek if an explicit offset or timestamp was provided. if from_offset || seek_consumer(consumer, topic, from_offset: from_offset, from_timestamp: ) end log_replay(topic, replay_group, from_beginning: from_beginning, from_offset: from_offset, from_timestamp: ) loop do = consumer.poll(poll_timeout_ms) break if .nil? yield IncomingMessage.new() end ensure consumer.close end rescue StandardError => e handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.consumer.replay', topic: topic, replay_group: replay_group) raise Legion::Transport::Kafka::ConsumerError, "Kafka replay error on #{topic}: #{e.}" end |
.reset! ⇒ Object
No persistent state to reset; included for symmetry with Producer.reset!
101 102 103 |
# File 'lib/legion/transport/kafka/consumer.rb', line 101 def reset! true end |
.subscribe(topics, group:, from_beginning: false, max_messages: nil) {|IncomingMessage| ... } ⇒ Object
Subscribe to one or more topics and yield each message to the block. Runs synchronously in the calling thread — wrap in a Thread or actor for background processing.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/legion/transport/kafka/consumer.rb', line 26 def subscribe(topics, group:, from_beginning: false, max_messages: nil) topic_list = Array(topics) cfg = consumer_config(group: group, from_beginning: from_beginning) consumer = ::Rdkafka::Config.new(cfg).consumer begin consumer.subscribe(*topic_list) log_subscribe(topic_list, group) count = 0 loop do = consumer.poll(poll_timeout_ms) next if .nil? yield IncomingMessage.new() count += 1 commit_if_needed(consumer, count) break if && count >= end rescue StopIteration # clean exit from consumer loop ensure consumer.close end rescue StandardError => e handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.consumer.subscribe', group: group, topics: topic_list.join(',')) raise Legion::Transport::Kafka::ConsumerError, "Kafka consumer error: #{e.}" end |