Class: Karafka::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/runner.rb

Overview

Class used to run the Karafka listeners in separate threads

Instance Method Summary collapse

Instance Method Details

#callObject

Starts listening on all the listeners asynchronously and handles the jobs queue closing after listeners are done with their work.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/karafka/runner.rb', line 13

def call
  jobs_queue = Karafka::Server.jobs_queue
  workers = Karafka::Server.workers

  # Wire up the circular dependency between pool and queue
  workers.jobs_queue = jobs_queue
  jobs_queue.pool = workers

  listeners = Connection::ListenersBatch.new(jobs_queue)

  # We mark it prior to delegating to the manager as manager will have to start at least one
  # connection to Kafka, hence running
  Karafka::App.run!

  # Register all the listeners so they can be started and managed
  manager.register(listeners)

  # We aggregate threads here for a supervised shutdown process
  Karafka::Server.listeners = listeners

  # Start worker threads after listeners are created so a failure in the boot steps above
  # does not leave live worker threads blocked on an open queue.
  workers.scale(Karafka::App.config.concurrency)

  until manager.done?
    conductor.wait

    manager.control
  end

  # We close the jobs queue only when no listener threads are working.
  # This ensures, that everything was closed prior to us not accepting anymore jobs and that
  # no more jobs will be enqueued. Since each listener waits for jobs to finish, once those
  # are done, we can close.
  jobs_queue.close

  # All the workers need to stop processing anything before we can stop the runner completely
  # This ensures that even async long-running jobs have time to finish before we are done
  # with everything. One thing worth keeping in mind though: It is the end user responsibility
  # to handle the shutdown detection in their long-running processes. Otherwise if timeout
  # is exceeded, there will be a forced shutdown.
  workers.join
# If anything crashes here, we need to raise the error and crush the runner because it means
# that something terrible happened
rescue => e
  Karafka.monitor.instrument(
    "error.occurred",
    caller: self,
    error: e,
    type: "runner.call.error"
  )
  Karafka::App.stop!

  # Clean up workers so we don't leak threads blocked on the queue
  jobs_queue.close
  workers.join

  raise e
end