Class: Wurk::Launcher
- Inherits:
-
Object
- Object
- Wurk::Launcher
- Includes:
- Component
- Defined in:
- lib/wurk/launcher.rb
Overview
Top-level supervisor inside each worker process. Owns the Manager pool (one per Capsule), the scheduler poller, and the heartbeat thread. The heartbeat WIRE lives in Wurk::Heartbeat — Launcher owns lifecycle, signal dispatch, and stats rollup; Heartbeat owns the Redis writes.
Lifecycle:
* `run(async_beat:)` — freeze config, start heartbeat, poller, managers.
* `quiet` — stop fetching across all managers + poller.
* `stop` — graceful drain inside `config[:timeout]`.
* `heartbeat` — one-shot beat (also driven by the heartbeat thread).
‘flush_stats` rolls per-process Processor counters (PROCESSED / FAILURE / EXPIRED) into the global + per-day Redis strings every beat. Per-day keys carry `STATS_TTL` so old days expire automatically.
Spec: docs/target/sidekiq-free.md §12 (Sidekiq::Launcher).
Constant Summary collapse
- STATS_TTL =
5 years, in seconds. Per-day ‘stat:processed:YYYY-MM-DD` / `stat:failed:YYYY-MM-DD` / `stat:expired:YYYY-MM-DD` strings carry this TTL so they roll off without manual cleanup.
5 * 365 * 24 * 60 * 60
- BEAT_PAUSE =
Re-exported for test/third-party callers that read it off Launcher (Sidekiq’s drop-in surface). The single source of truth is Heartbeat.
Heartbeat::BEAT_PAUSE
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#cron_poller ⇒ Object
Returns the value of attribute cron_poller.
-
#heartbeat_thread ⇒ Object
readonly
Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.
-
#managers ⇒ Object
Returns the value of attribute managers.
-
#metrics_rollup ⇒ Object
Returns the value of attribute metrics_rollup.
-
#poller ⇒ Object
Returns the value of attribute poller.
Attributes included from Component
Instance Method Summary collapse
-
#flush_stats ⇒ Object
Rolls in-process Processor counters into Redis.
-
#heartbeat ⇒ Object
One-shot beat.
-
#initialize(config, embedded: false) ⇒ Launcher
constructor
A new instance of Launcher.
-
#quiet ⇒ Object
Idempotent.
-
#run(async_beat: true) ⇒ Object
Boot order matters: 1.
-
#stop ⇒ Object
Graceful shutdown.
- #stopping? ⇒ Boolean
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config, embedded: false) ⇒ Launcher
Returns a new instance of Launcher.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/wurk/launcher.rb', line 46 def initialize(config, embedded: false) @config = config @embedded = @done = false @managers = config.capsules.values.map { |cap| Manager.new(cap) } @poller = build_poller @cron_poller = build_cron_poller @metrics_rollup = build_metrics_rollup @leader = build_leader @reaper = build_reaper @started_at = nil @heartbeat = nil @heartbeat_thread = nil @health_server = build_health_server end |
Instance Attribute Details
#cron_poller ⇒ Object
Returns the value of attribute cron_poller.
44 45 46 |
# File 'lib/wurk/launcher.rb', line 44 def cron_poller @cron_poller end |
#heartbeat_thread ⇒ Object (readonly)
Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.
155 156 157 |
# File 'lib/wurk/launcher.rb', line 155 def heartbeat_thread @heartbeat_thread end |
#managers ⇒ Object
Returns the value of attribute managers.
44 45 46 |
# File 'lib/wurk/launcher.rb', line 44 def managers @managers end |
#metrics_rollup ⇒ Object
Returns the value of attribute metrics_rollup.
44 45 46 |
# File 'lib/wurk/launcher.rb', line 44 def metrics_rollup @metrics_rollup end |
#poller ⇒ Object
Returns the value of attribute poller.
44 45 46 |
# File 'lib/wurk/launcher.rb', line 44 def poller @poller end |
Instance Method Details
#flush_stats ⇒ Object
Rolls in-process Processor counters into Redis. Pipelined so a single round trip covers all writes. Skips when all counters are zero to avoid touching keys we have nothing to add to.
139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/wurk/launcher.rb', line 139 def flush_stats processed = Processor::PROCESSED.reset failed = Processor::FAILURE.reset expired = Processor::EXPIRED.reset return if processed.zero? && failed.zero? && expired.zero? write_stats(processed, failed, expired) rescue StandardError => e # Replay-safety: counters were reset above, so a Redis blip would # otherwise drop stats. We log and accept — the per-job at-least-once # semantics don't apply to *counters*, and the next beat resets again. handle_exception(e, { context: 'flush_stats' }) end |
#heartbeat ⇒ Object
One-shot beat. Public for embedded mode (and for tests) — the heartbeat thread calls this on ‘BEAT_PAUSE` cadence.
131 132 133 134 |
# File 'lib/wurk/launcher.rb', line 131 def heartbeat flush_stats beat end |
#quiet ⇒ Object
Idempotent. Flips ‘stopping?` true, halts fetching across every Manager + the poller, then fires the `:quiet` event in reverse registration order so teardown hooks run LIFO.
92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/wurk/launcher.rb', line 92 def quiet return if @done @done = true @managers.each(&:quiet) @poller&.terminate # The cron poller is intentionally NOT terminated here: a USR1-quieted # leader still enqueues periodic jobs — it only stops fetching for itself. # Loops stop only on full shutdown (#stop). Spec: sidekiq-ent.md §2.6. fire_event(:quiet, reverse: true) end |
#run(async_beat: true) ⇒ Object
Boot order matters:
1. freeze! the config so mutations after fork are visible mistakes.
2. spawn the heartbeat thread BEFORE the managers so the dashboard
sees the process the moment it can pick up jobs.
3. start the scheduler poller + the cron poller (both leader-gated for
what they enqueue; safe to start before leadership is settled since
a non-leader tick just returns early).
4. start the managers (which start their processors).
5. start the health probe server LAST so the listener doesn't
accept k8s probes until the rest of the launcher is up.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/wurk/launcher.rb', line 72 def run(async_beat: true) @started_at = Time.now.to_f # Default each capsule's fetcher + materialize its lazy pools/middleware # before the config freezes. Every entry point (swarm child, standalone # CLI, embedded) runs through here, so none boots with a nil fetcher. @config.capsules.each_value(&:prepare!) @config.freeze! @heartbeat_thread = safe_thread('heartbeat', &method(:start_heartbeat)) if async_beat @poller&.start @leader&.start @cron_poller&.start @metrics_rollup&.start @managers.each(&:start) @reaper.start @health_server&.start end |
#stop ⇒ Object
Graceful shutdown. Deadline is monotonic so wall-clock skew can’t extend it. Managers stop in parallel threads so a slow capsule doesn’t block its siblings.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/wurk/launcher.rb', line 107 def stop deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + (@config[:timeout] || 25) quiet stoppers = @managers.map { |m| Thread.new { m.stop(deadline) } } fire_event(:shutdown, reverse: true) stoppers.each(&:join) # Full shutdown stops periodic firing (it survived #quiet); do this before # releasing the lock so no tick races a follower's promotion. @cron_poller&.terminate @metrics_rollup&.terminate @reaper&.stop # CAS-release the cluster lock now (planned shutdown) so a follower can # take over immediately instead of waiting out the TTL. @leader&.stop clear_heartbeat fire_event(:exit, reverse: true) end |
#stopping? ⇒ Boolean
125 126 127 |
# File 'lib/wurk/launcher.rb', line 125 def stopping? @done end |