Class: Deimos::Utils::SeekListener
- Inherits:
- 
      Phobos::Listener
      
        - Object
- Phobos::Listener
- Deimos::Utils::SeekListener
 
- Defined in:
- lib/deimos/utils/inline_consumer.rb
Overview
Listener that can seek to get the last X messages in a topic.
Constant Summary collapse
- MAX_SEEK_RETRIES =
          
- 3
Instance Attribute Summary collapse
Instance Method Summary collapse
Instance Attribute Details
#num_messages ⇒ Integer
| 12 13 14 | # File 'lib/deimos/utils/inline_consumer.rb', line 12 def @num_messages end | 
Instance Method Details
#start_listener ⇒ void
This method returns an undefined value.
| 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | # File 'lib/deimos/utils/inline_consumer.rb', line 15 def start_listener @num_messages ||= 10 @consumer = create_kafka_consumer @consumer.subscribe(topic, @subscribe_opts) attempt = 0 begin attempt += 1 last_offset = @kafka_client.last_offset_for(topic, 0) offset = last_offset - if offset.positive? Deimos.config.logger.info("Seeking to #{offset}") @consumer.seek(topic, 0, offset) end rescue StandardError => e if attempt < MAX_SEEK_RETRIES sleep(1.seconds * attempt) retry end log_error("Could not seek to offset: #{e.} after #{MAX_SEEK_RETRIES} retries", ) end instrument('listener.start_handler', ) do @handler_class.start(@kafka_client) end log_info('Listener started', ) end |