Class: Karafka::Fetcher
- Inherits:
-
Object
- Object
- Karafka::Fetcher
- Defined in:
- lib/karafka/fetcher.rb
Overview
Note:
Creating multiple fetchers will result in having multiple connections to the same topics, which means that if there are no partitions, it won't use them.
Class used to run the Karafka consumer and handle shutting down, restarting etc
Instance Method Summary collapse
-
#call ⇒ Object
Starts listening on all the listeners asynchronously Fetch loop should never end, which means that we won't create more actor clusters so we don't have to terminate them.
Instance Method Details
#call ⇒ Object
Starts listening on all the listeners asynchronously Fetch loop should never end, which means that we won't create more actor clusters so we don't have to terminate them
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/karafka/fetcher.rb', line 11 def call threads = listeners.map do |listener| # We abort on exception because there should be an exception handling developed for # each listener running in separate threads, so the exceptions should never leak # and if that happens, it means that something really bad happened and we should stop # the whole process Thread .new { listener.call } .tap { |thread| thread.abort_on_exception = true } end # We aggregate threads here for a supervised shutdown process threads.each { |thread| Karafka::Server.consumer_threads << thread } threads.each(&:join) # If anything crashes here, we need to raise the error and crush the runner because it means # that something terrible happened rescue StandardError => e Karafka.monitor.instrument('fetcher.call.error', caller: self, error: e) Karafka::App.stop! raise e end |