Module: Karafka::Patches::RubyKafka
- Defined in:
- lib/karafka/patches/ruby_kafka.rb
Overview
Patches for Ruby Kafka gem
Instance Method Summary collapse
-
#consumer_loop ⇒ Object
This patch allows us to inject business logic in between fetches and before the consumer stop, so we can perform stop commit or anything else that we need since ruby-kafka fetch loop does not allow that directly We don't won't to use poll ruby-kafka api as it brings many more problems that we would have to take care of.
Instance Method Details
#consumer_loop ⇒ Object
This patch allows us to inject business logic in between fetches and before the consumer stop, so we can perform stop commit or anything else that we need since ruby-kafka fetch loop does not allow that directly We don't won't to use poll ruby-kafka api as it brings many more problems that we would have to take care of. That way, nothing like that ever happens but we get the control over the stopping process that we need (since we're the once that initiate it for each thread)
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/karafka/patches/ruby_kafka.rb', line 15 def consumer_loop super do consumers = Karafka::Persistence::Consumers .current .values .flat_map(&:values) .select { |consumer| consumer.class.respond_to?(:after_fetch) } if Karafka::App.stopping? publish_event(consumers, 'before_stop') Karafka::Persistence::Client.read.stop else publish_event(consumers, 'before_poll') yield publish_event(consumers, 'after_poll') end end end |