Class: Deimos::Utils::SeekListener
- Inherits:
-
Phobos::Listener
- Object
- Phobos::Listener
- Deimos::Utils::SeekListener
- Defined in:
- lib/deimos/utils/inline_consumer.rb
Overview
Listener that can seek to get the last X messages in a topic.
Constant Summary collapse
- MAX_SEEK_RETRIES =
3
Instance Attribute Summary collapse
-
#num_messages ⇒ Object
Returns the value of attribute num_messages.
Instance Method Summary collapse
-
#start_listener ⇒ Object
:nodoc:.
Instance Attribute Details
#num_messages ⇒ Object
Returns the value of attribute num_messages.
10 11 12 |
# File 'lib/deimos/utils/inline_consumer.rb', line 10 def @num_messages end |
Instance Method Details
#start_listener ⇒ Object
:nodoc:
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/deimos/utils/inline_consumer.rb', line 13 def start_listener @num_messages ||= 10 @consumer = create_kafka_consumer @consumer.subscribe(topic, @subscribe_opts) attempt = 0 begin attempt += 1 last_offset = @kafka_client.last_offset_for(topic, 0) offset = last_offset - if offset.positive? Deimos.config.logger.info("Seeking to #{offset}") @consumer.seek(topic, 0, offset) end rescue StandardError => e if attempt < MAX_SEEK_RETRIES sleep(1.seconds * attempt) retry end log_error("Could not seek to offset: #{e.} after #{MAX_SEEK_RETRIES} retries", ) end instrument('listener.start_handler', ) do @handler_class.start(@kafka_client) end log_info('Listener started', ) end |