Class: Wurk::Launcher

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/launcher.rb

Overview

Top-level supervisor inside each worker process. Owns the Manager pool (one per Capsule), the scheduler poller, and the heartbeat thread. The heartbeat WIRE lives in Wurk::Heartbeat — Launcher owns lifecycle, signal dispatch, and stats rollup; Heartbeat owns the Redis writes.

Lifecycle:

* `run(async_beat:)` — freeze config, start heartbeat, poller, managers.
* `quiet`            — stop fetching across all managers + poller.
* `stop`             — graceful drain inside `config[:timeout]`.
* `heartbeat`        — one-shot beat (also driven by the heartbeat thread).

‘flush_stats` rolls per-process Processor counters (PROCESSED / FAILURE / EXPIRED) into the global + per-day Redis strings every beat. Per-day keys carry `STATS_TTL` so old days expire automatically.

Spec: docs/target/sidekiq-free.md §12 (Sidekiq::Launcher).

Constant Summary collapse

STATS_TTL =

5 years, in seconds. Per-day ‘stat:processed:YYYY-MM-DD` / `stat:failed:YYYY-MM-DD` / `stat:expired:YYYY-MM-DD` strings carry this TTL so they roll off without manual cleanup.

5 * 365 * 24 * 60 * 60
BEAT_PAUSE =

Re-exported for test/third-party callers that read it off Launcher (Sidekiq’s drop-in surface). The single source of truth is Heartbeat.

Heartbeat::BEAT_PAUSE

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config, embedded: false) ⇒ Launcher

Returns a new instance of Launcher.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/wurk/launcher.rb', line 48

def initialize(config, embedded: false)
  @config = config
  @embedded = embedded
  # Two separate flags, deliberately. @done = "quieted" (stop fetching, stay
  # alive, report quiet=true). @stopped = "shutting down" (terminate the
  # heartbeat loop). Quiet must NOT stop the heartbeat — otherwise a quieted
  # process never publishes quiet=true and expires out of the live set (#236).
  @done = false
  @stopped = false
  @managers = config.capsules.values.map { |cap| Manager.new(cap) }
  @poller = build_poller
  @cron_poller = build_cron_poller
  @metrics_rollup = build_metrics_rollup
  @queue_rollup = build_queue_rollup
  @history = build_history
  @leader = build_leader
  @reaper = build_reaper
  @started_at = nil
  @heartbeat = nil
  @heartbeat_thread = nil
  @health_server = build_health_server
end

Instance Attribute Details

#cron_pollerObject

Returns the value of attribute cron_poller.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def cron_poller
  @cron_poller
end

#heartbeat_threadObject (readonly)

Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.



170
171
172
# File 'lib/wurk/launcher.rb', line 170

def heartbeat_thread
  @heartbeat_thread
end

#historyObject

Returns the value of attribute history.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def history
  @history
end

#managersObject

Returns the value of attribute managers.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def managers
  @managers
end

#metrics_rollupObject

Returns the value of attribute metrics_rollup.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def metrics_rollup
  @metrics_rollup
end

#pollerObject

Returns the value of attribute poller.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def poller
  @poller
end

#queue_rollupObject

Returns the value of attribute queue_rollup.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def queue_rollup
  @queue_rollup
end

Instance Method Details

#flush_statsObject

Rolls in-process Processor counters into Redis. Pipelined so a single round trip covers all writes. Skips when all counters are zero to avoid touching keys we have nothing to add to.



154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/wurk/launcher.rb', line 154

def flush_stats
  processed = Processor::PROCESSED.reset
  failed = Processor::FAILURE.reset
  expired = Processor::EXPIRED.reset
  return if processed.zero? && failed.zero? && expired.zero?

  write_stats(processed, failed, expired)
rescue StandardError => e
  # Replay-safety: counters were reset above, so a Redis blip would
  # otherwise drop stats. We log and accept — the per-job at-least-once
  # semantics don't apply to *counters*, and the next beat resets again.
  handle_exception(e, { context: 'flush_stats' })
end

#heartbeatObject

One-shot beat. Public for embedded mode (and for tests) — the heartbeat thread calls this on ‘BEAT_PAUSE` cadence.



146
147
148
149
# File 'lib/wurk/launcher.rb', line 146

def heartbeat
  flush_stats
  beat
end

#quietObject

Idempotent. Flips ‘stopping?` true, halts fetching across every Manager + the poller, then fires the `:quiet` event in reverse registration order so teardown hooks run LIFO.



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/wurk/launcher.rb', line 104

def quiet
  return if @done

  @done = true
  @managers.each(&:quiet)
  @poller&.terminate
  # The cron poller is intentionally NOT terminated here: a USR1-quieted
  # leader still enqueues periodic jobs — it only stops fetching for itself.
  # Loops stop only on full shutdown (#stop). Spec: sidekiq-ent.md §2.6.
  fire_event(:quiet, reverse: true)
end

#run(async_beat: true) ⇒ Object

Boot order matters:

1. freeze! the config so mutations after fork are visible mistakes.
2. spawn the heartbeat thread BEFORE the managers so the dashboard
   sees the process the moment it can pick up jobs.
3. start the scheduler poller + the cron poller (both leader-gated for
   what they enqueue; safe to start before leadership is settled since
   a non-leader tick just returns early).
4. start the managers (which start their processors).
5. start the health probe server LAST so the listener doesn't
   accept k8s probes until the rest of the launcher is up.


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/wurk/launcher.rb', line 81

def run(async_beat: true)
  @started_at = Time.now.to_f
  # Default each capsule's fetcher + materialize its lazy pools/middleware
  # before the config freezes. Every entry point (swarm child, standalone
  # CLI, embedded) runs through here, so none boots with a nil fetcher.
  @config.capsules.each_value(&:prepare!)
  @config.freeze!
  @heartbeat_thread = safe_thread('heartbeat', &method(:start_heartbeat)) if async_beat
  @poller&.start
  @leader&.start
  @cron_poller&.start
  @metrics_rollup&.start
  @queue_rollup&.start
  @history&.start
  @managers.each(&:start)
  @reaper.start
  boot_reclaim
  @health_server&.start
end

#stopObject

Graceful shutdown. Deadline is monotonic so wall-clock skew can’t extend it. Managers stop in parallel threads so a slow capsule doesn’t block its siblings.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/wurk/launcher.rb', line 119

def stop
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + (@config[:timeout] || 25)
  quiet
  stoppers = @managers.map { |m| Thread.new { m.stop(deadline) } }
  fire_event(:shutdown, reverse: true)
  stoppers.each(&:join)
  # Full shutdown stops periodic firing (it survived #quiet); do this before
  # releasing the lock so no tick races a follower's promotion.
  @cron_poller&.terminate
  @metrics_rollup&.terminate
  @queue_rollup&.terminate
  @history&.terminate
  @reaper&.stop
  # CAS-release the cluster lock now (planned shutdown) so a follower can
  # take over immediately instead of waiting out the TTL.
  @leader&.stop
  stop_heartbeat
  clear_heartbeat
  fire_event(:exit, reverse: true)
end

#stopping?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/wurk/launcher.rb', line 140

def stopping?
  @done
end