Class: Wurk::Launcher
- Inherits:
-
Object
- Object
- Wurk::Launcher
- Includes:
- Component
- Defined in:
- lib/wurk/launcher.rb
Overview
Top-level supervisor inside each worker process. Owns the Manager pool (one per Capsule), the scheduler poller, and the heartbeat thread. The heartbeat WIRE lives in Wurk::Heartbeat — Launcher owns lifecycle, signal dispatch, and stats rollup; Heartbeat owns the Redis writes.
Lifecycle:
* `run(async_beat:)` — freeze config, start heartbeat, poller, managers.
* `quiet` — stop fetching across all managers + poller.
* `stop` — graceful drain inside `config[:timeout]`.
* `heartbeat` — one-shot beat (also driven by the heartbeat thread).
‘flush_stats` rolls per-process Processor counters (PROCESSED / FAILURE / EXPIRED) into the global + per-day Redis strings every beat. Per-day keys carry `STATS_TTL` so old days expire automatically.
Spec: docs/target/sidekiq-free.md §12 (Sidekiq::Launcher).
Constant Summary collapse
- STATS_TTL =
5 years, in seconds. Per-day ‘stat:processed:YYYY-MM-DD` / `stat:failed:YYYY-MM-DD` / `stat:expired:YYYY-MM-DD` strings carry this TTL so they roll off without manual cleanup.
5 * 365 * 24 * 60 * 60
- BEAT_PAUSE =
Re-exported for test/third-party callers that read it off Launcher (Sidekiq’s drop-in surface). The single source of truth is Heartbeat.
Heartbeat::BEAT_PAUSE
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#cron_poller ⇒ Object
Returns the value of attribute cron_poller.
-
#heartbeat_thread ⇒ Object
readonly
Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.
-
#history ⇒ Object
Returns the value of attribute history.
-
#managers ⇒ Object
Returns the value of attribute managers.
-
#metrics_rollup ⇒ Object
Returns the value of attribute metrics_rollup.
-
#poller ⇒ Object
Returns the value of attribute poller.
-
#queue_rollup ⇒ Object
Returns the value of attribute queue_rollup.
Attributes included from Component
Instance Method Summary collapse
-
#flush_stats ⇒ Object
Rolls in-process Processor counters into Redis.
-
#heartbeat ⇒ Object
One-shot beat.
-
#initialize(config, embedded: false) ⇒ Launcher
constructor
A new instance of Launcher.
-
#quiet ⇒ Object
Idempotent.
-
#run(async_beat: true) ⇒ Object
Boot order matters: 1.
-
#stop ⇒ Object
Graceful shutdown.
- #stopping? ⇒ Boolean
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config, embedded: false) ⇒ Launcher
Returns a new instance of Launcher.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/wurk/launcher.rb', line 48 def initialize(config, embedded: false) @config = config @embedded = @done = false @managers = config.capsules.values.map { |cap| Manager.new(cap) } @poller = build_poller @cron_poller = build_cron_poller @metrics_rollup = build_metrics_rollup @queue_rollup = build_queue_rollup @history = build_history @leader = build_leader @reaper = build_reaper @started_at = nil @heartbeat = nil @heartbeat_thread = nil @health_server = build_health_server end |
Instance Attribute Details
#cron_poller ⇒ Object
Returns the value of attribute cron_poller.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def cron_poller @cron_poller end |
#heartbeat_thread ⇒ Object (readonly)
Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.
164 165 166 |
# File 'lib/wurk/launcher.rb', line 164 def heartbeat_thread @heartbeat_thread end |
#history ⇒ Object
Returns the value of attribute history.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def history @history end |
#managers ⇒ Object
Returns the value of attribute managers.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def managers @managers end |
#metrics_rollup ⇒ Object
Returns the value of attribute metrics_rollup.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def metrics_rollup @metrics_rollup end |
#poller ⇒ Object
Returns the value of attribute poller.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def poller @poller end |
#queue_rollup ⇒ Object
Returns the value of attribute queue_rollup.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def queue_rollup @queue_rollup end |
Instance Method Details
#flush_stats ⇒ Object
Rolls in-process Processor counters into Redis. Pipelined so a single round trip covers all writes. Skips when all counters are zero to avoid touching keys we have nothing to add to.
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/wurk/launcher.rb', line 148 def flush_stats processed = Processor::PROCESSED.reset failed = Processor::FAILURE.reset expired = Processor::EXPIRED.reset return if processed.zero? && failed.zero? && expired.zero? write_stats(processed, failed, expired) rescue StandardError => e # Replay-safety: counters were reset above, so a Redis blip would # otherwise drop stats. We log and accept — the per-job at-least-once # semantics don't apply to *counters*, and the next beat resets again. handle_exception(e, { context: 'flush_stats' }) end |
#heartbeat ⇒ Object
One-shot beat. Public for embedded mode (and for tests) — the heartbeat thread calls this on ‘BEAT_PAUSE` cadence.
140 141 142 143 |
# File 'lib/wurk/launcher.rb', line 140 def heartbeat flush_stats beat end |
#quiet ⇒ Object
Idempotent. Flips ‘stopping?` true, halts fetching across every Manager + the poller, then fires the `:quiet` event in reverse registration order so teardown hooks run LIFO.
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/wurk/launcher.rb', line 99 def quiet return if @done @done = true @managers.each(&:quiet) @poller&.terminate # The cron poller is intentionally NOT terminated here: a USR1-quieted # leader still enqueues periodic jobs — it only stops fetching for itself. # Loops stop only on full shutdown (#stop). Spec: sidekiq-ent.md §2.6. fire_event(:quiet, reverse: true) end |
#run(async_beat: true) ⇒ Object
Boot order matters:
1. freeze! the config so mutations after fork are visible mistakes.
2. spawn the heartbeat thread BEFORE the managers so the dashboard
sees the process the moment it can pick up jobs.
3. start the scheduler poller + the cron poller (both leader-gated for
what they enqueue; safe to start before leadership is settled since
a non-leader tick just returns early).
4. start the managers (which start their processors).
5. start the health probe server LAST so the listener doesn't
accept k8s probes until the rest of the launcher is up.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/wurk/launcher.rb', line 76 def run(async_beat: true) @started_at = Time.now.to_f # Default each capsule's fetcher + materialize its lazy pools/middleware # before the config freezes. Every entry point (swarm child, standalone # CLI, embedded) runs through here, so none boots with a nil fetcher. @config.capsules.each_value(&:prepare!) @config.freeze! @heartbeat_thread = safe_thread('heartbeat', &method(:start_heartbeat)) if async_beat @poller&.start @leader&.start @cron_poller&.start @metrics_rollup&.start @queue_rollup&.start @history&.start @managers.each(&:start) @reaper.start boot_reclaim @health_server&.start end |
#stop ⇒ Object
Graceful shutdown. Deadline is monotonic so wall-clock skew can’t extend it. Managers stop in parallel threads so a slow capsule doesn’t block its siblings.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/wurk/launcher.rb', line 114 def stop deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + (@config[:timeout] || 25) quiet stoppers = @managers.map { |m| Thread.new { m.stop(deadline) } } fire_event(:shutdown, reverse: true) stoppers.each(&:join) # Full shutdown stops periodic firing (it survived #quiet); do this before # releasing the lock so no tick races a follower's promotion. @cron_poller&.terminate @metrics_rollup&.terminate @queue_rollup&.terminate @history&.terminate @reaper&.stop # CAS-release the cluster lock now (planned shutdown) so a follower can # take over immediately instead of waiting out the TTL. @leader&.stop clear_heartbeat fire_event(:exit, reverse: true) end |
#stopping? ⇒ Boolean
134 135 136 |
# File 'lib/wurk/launcher.rb', line 134 def stopping? @done end |