Class: Karafka::Swarm::Supervisor
- Inherits:
-
Object
- Object
- Karafka::Swarm::Supervisor
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/swarm/supervisor.rb
Overview
Note:
Technically speaking supervisor is never in the running state because we do not want to have any sockets or anything else on it that could break under forking. It has its own “supervising” state from which it can go to the final shutdown.
Supervisor that starts forks and uses monitor to monitor them. Also handles shutdown of all the processes including itself.
In case any node dies, it will be restarted.
Instance Method Summary collapse
-
#initialize ⇒ Supervisor
constructor
A new instance of Supervisor.
-
#run ⇒ Object
Creates needed number of forks, installs signals and starts supervision.
Constructor Details
#initialize ⇒ Supervisor
Returns a new instance of Supervisor.
35 36 37 38 |
# File 'lib/karafka/swarm/supervisor.rb', line 35 def initialize @mutex = Mutex.new @queue = Processing::TimedQueue.new end |
Instance Method Details
#run ⇒ Object
Creates needed number of forks, installs signals and starts supervision
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/karafka/swarm/supervisor.rb', line 41 def run # Close producer just in case. While it should not be used, we do not want even a # theoretical case since librdkafka is not thread-safe. # We close it prior to forking just to make sure, there is no issue with initialized # producer (should not be initialized but just in case) Karafka.producer.close Karafka::App.warmup manager.start process.on_sigint { stop } process.on_sigquit { stop } process.on_sigterm { stop } process.on_sigtstp { quiet } process.on_sigttin { signal('TTIN') } # Needed to be registered as we want to unlock on child changes process.on_sigchld {} process.on_any_active { unlock } process.supervise Karafka::App.supervise! loop do return if Karafka::App.terminated? lock control end # If anything went wrong, signal this and die # Supervisor is meant to be thin and not cause any issues. If you encounter this case # please report it as it should be considered critical rescue StandardError => e monitor.instrument( 'error.occurred', caller: self, error: e, manager: manager, type: 'swarm.supervisor.error' ) manager.terminate manager.cleanup raise e end |