Class: Deimos::Utils::InlineConsumer
- Inherits:
-
Object
- Object
- Deimos::Utils::InlineConsumer
- Defined in:
- lib/deimos/utils/inline_consumer.rb
Overview
Class which can process/consume messages inline.
Constant Summary collapse
- MAX_MESSAGE_WAIT_TIME =
1.second
- MAX_TOPIC_WAIT_TIME =
10.seconds
Class Method Summary collapse
-
.consume(topic:, frk_consumer:, num_messages: 10) ⇒ Object
Consume the last X messages from a topic.
-
.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) ⇒ Array<Hash>
Get the last X messages from a topic.
Class Method Details
.consume(topic:, frk_consumer:, num_messages: 10) ⇒ Object
Consume the last X messages from a topic.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/deimos/utils/inline_consumer.rb', line 109 def self.consume(topic:, frk_consumer:, num_messages: 10) listener = SeekListener.new( handler: frk_consumer, group_id: SecureRandom.hex, topic: topic, heartbeat_interval: 1 ) listener. = # Add the start_time and last_message_time attributes to the # consumer class so we can kill it if it's gone on too long class << frk_consumer attr_accessor :start_time, :last_message_time end subscribers = [] subscribers << ActiveSupport::Notifications. subscribe('phobos.listener.process_message') do frk_consumer. = Time.zone.now end subscribers << ActiveSupport::Notifications. subscribe('phobos.listener.start_handler') do frk_consumer.start_time = Time.zone.now frk_consumer. = nil end subscribers << ActiveSupport::Notifications. subscribe('heartbeat.consumer.kafka') do if frk_consumer. if Time.zone.now - frk_consumer. > MAX_MESSAGE_WAIT_TIME raise Phobos::AbortError end elsif Time.zone.now - frk_consumer.start_time > MAX_TOPIC_WAIT_TIME Deimos.config.logger.error('Aborting - initial wait too long') raise Phobos::AbortError end end listener.start subscribers.each { |s| ActiveSupport::Notifications.unsubscribe(s) } end |
.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) ⇒ Array<Hash>
Get the last X messages from a topic. You can specify a subclass of Deimos::Consumer or Deimos::Producer, or provide the schema, namespace and key_config directly.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/deimos/utils/inline_consumer.rb', line 82 def self.(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) if config_class MessageBankHandler.config_class = config_class elsif schema.nil? || key_config.nil? raise 'You must specify either a config_class or a schema, namespace and key_config!' else MessageBankHandler.class_eval do schema schema namespace namespace key_config key_config @decoder = nil @key_decoder = nil end end self.consume(topic: topic, frk_consumer: MessageBankHandler, num_messages: ) = MessageBankHandler. .size <= ? : [-..-1] end |