Class: Wurk::Manager
Overview
One per Capsule. Lives inside each forked child and owns the Processor pool. Replaces dead processors on the fly (replace-on-die), forwards the quiet/stop signals received by the Swarm to its processors, and ensures in-flight UnitsOfWork are bulk_requeued before threads are killed.
Lifecycle:
* `start` — spawn each processor thread.
* `quiet` — stop fetching; in-flight jobs run to completion.
* `stop(deadline)`— quiet + wait for drain; hard_shutdown on timeout.
Spec: docs/target/sidekiq-free.md §13.
Constant Summary collapse
- PAUSE_TIME =
0.1 in TTY mode so interactive shutdown feels snappy; 0.5 in production so the supervisor isn’t spinning while threads drain.
$stdout.tty? ? 0.1 : 0.5
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#capsule ⇒ Object
readonly
Returns the value of attribute capsule.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Attributes included from Component
Instance Method Summary collapse
-
#hard_shutdown ⇒ Object
Reached when the deadline expired with workers still busy.
-
#initialize(capsule) ⇒ Manager
constructor
A new instance of Manager.
-
#processor_result(processor, _reason = nil) ⇒ Object
Processor#run callback: invoked when a Processor thread exits, whether cleanly or via raised exception.
- #quiet ⇒ Object
- #start ⇒ Object
-
#stop(deadline) ⇒ Object
Graceful shutdown: quiet first, then poll for workers to clear.
- #stopped? ⇒ Boolean
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(capsule) ⇒ Manager
Returns a new instance of Manager.
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/wurk/manager.rb', line 27 def initialize(capsule) @config = @capsule = capsule @count = capsule.concurrency raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1 @done = false @workers = Set.new @plock = ::Mutex.new @count.times do @workers << Processor.new(@capsule, &method(:processor_result)) end end |
Instance Attribute Details
#capsule ⇒ Object (readonly)
Returns the value of attribute capsule.
25 26 27 |
# File 'lib/wurk/manager.rb', line 25 def capsule @capsule end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
25 26 27 |
# File 'lib/wurk/manager.rb', line 25 def workers @workers end |
Instance Method Details
#hard_shutdown ⇒ Object
Reached when the deadline expired with workers still busy. We must push their in-flight UoWs back to the public queues BEFORE raising Wurk::Shutdown into the threads — losing a job is worse than running it twice (Sidekiq’s at-least-once contract).
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/wurk/manager.rb', line 94 def hard_shutdown # rubocop:disable Metrics/AbcSize cleanup = nil @plock.synchronize do cleanup = @workers.dup end if cleanup.any? jobs = cleanup.map(&:job).compact logger.warn { "Terminating #{cleanup.size} busy threads" } logger.debug { "Jobs still in progress #{jobs.inspect}" } capsule.fetcher.bulk_requeue(jobs) end cleanup.each(&:kill) # The caller typically `exit`s immediately after we return; give # threads a brief window to run their `ensure` blocks. deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + 3 wait_for(deadline) { @workers.empty? } end |
#processor_result(processor, _reason = nil) ⇒ Object
Processor#run callback: invoked when a Processor thread exits, whether cleanly or via raised exception. Removes the dead processor from the pool and (unless we’re already stopping) spawns a replacement so the capsule’s concurrency stays constant.
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/wurk/manager.rb', line 79 def processor_result(processor, _reason = nil) @plock.synchronize do @workers.delete(processor) unless @done p = Processor.new(@capsule, &method(:processor_result)) @workers << p p.start end end end |
#quiet ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/wurk/manager.rb', line 44 def quiet return if @done @done = true logger.info { "Terminating quiet threads for #{capsule.name} capsule" } @workers.each(&:terminate) end |
#start ⇒ Object
40 41 42 |
# File 'lib/wurk/manager.rb', line 40 def start @workers.each(&:start) end |
#stop(deadline) ⇒ Object
Graceful shutdown: quiet first, then poll for workers to clear. If the deadline elapses with workers still alive we fall through to hard_shutdown, which bulk_requeues their UoWs before killing threads.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/wurk/manager.rb', line 55 def stop(deadline) quiet # Lifecycle hooks (e.g. :quiet) can be async; give them a tick to settle # before we start polling. Matches Sidekiq's PAUSE_TIME behavior. sleep PAUSE_TIME return if @workers.empty? logger.info { 'Pausing to allow jobs to finish...' } wait_for(deadline) { @workers.empty? } return if @workers.empty? hard_shutdown ensure capsule.stop end |
#stopped? ⇒ Boolean
71 72 73 |
# File 'lib/wurk/manager.rb', line 71 def stopped? @done end |