Class: Wurk::Launcher

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/launcher.rb

Overview

Top-level supervisor inside each worker process. Owns the Manager pool (one per Capsule), the scheduler poller, and the heartbeat thread. The heartbeat WIRE lives in Wurk::Heartbeat — Launcher owns lifecycle, signal dispatch, and stats rollup; Heartbeat owns the Redis writes.

Lifecycle:

* `run(async_beat:)` — freeze config, start heartbeat, poller, managers.
* `quiet`            — stop fetching across all managers + poller.
* `stop`             — graceful drain inside `config[:timeout]`.
* `heartbeat`        — one-shot beat (also driven by the heartbeat thread).

‘flush_stats` rolls per-process Processor counters (PROCESSED / FAILURE / EXPIRED) into the global + per-day Redis strings every beat. Per-day keys carry `STATS_TTL` so old days expire automatically.

Spec: docs/target/sidekiq-free.md §12 (Sidekiq::Launcher).

Constant Summary collapse

STATS_TTL =

5 years, in seconds. Per-day ‘stat:processed:YYYY-MM-DD` / `stat:failed:YYYY-MM-DD` / `stat:expired:YYYY-MM-DD` strings carry this TTL so they roll off without manual cleanup.

5 * 365 * 24 * 60 * 60
BEAT_PAUSE =

Re-exported for test/third-party callers that read it off Launcher (Sidekiq’s drop-in surface). The single source of truth is Heartbeat.

Heartbeat::BEAT_PAUSE

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config, embedded: false) ⇒ Launcher

Returns a new instance of Launcher.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/wurk/launcher.rb', line 46

def initialize(config, embedded: false)
  @config = config
  @embedded = embedded
  @done = false
  @managers = config.capsules.values.map { |cap| Manager.new(cap) }
  @poller = build_poller
  @cron_poller = build_cron_poller
  @metrics_rollup = build_metrics_rollup
  @leader = build_leader
  @reaper = build_reaper
  @started_at = nil
  @heartbeat = nil
  @heartbeat_thread = nil
  @health_server = build_health_server
end

Instance Attribute Details

#cron_pollerObject

Returns the value of attribute cron_poller.



44
45
46
# File 'lib/wurk/launcher.rb', line 44

def cron_poller
  @cron_poller
end

#heartbeat_threadObject (readonly)

Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.



155
156
157
# File 'lib/wurk/launcher.rb', line 155

def heartbeat_thread
  @heartbeat_thread
end

#managersObject

Returns the value of attribute managers.



44
45
46
# File 'lib/wurk/launcher.rb', line 44

def managers
  @managers
end

#metrics_rollupObject

Returns the value of attribute metrics_rollup.



44
45
46
# File 'lib/wurk/launcher.rb', line 44

def metrics_rollup
  @metrics_rollup
end

#pollerObject

Returns the value of attribute poller.



44
45
46
# File 'lib/wurk/launcher.rb', line 44

def poller
  @poller
end

Instance Method Details

#flush_statsObject

Rolls in-process Processor counters into Redis. Pipelined so a single round trip covers all writes. Skips when all counters are zero to avoid touching keys we have nothing to add to.



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/wurk/launcher.rb', line 139

def flush_stats
  processed = Processor::PROCESSED.reset
  failed = Processor::FAILURE.reset
  expired = Processor::EXPIRED.reset
  return if processed.zero? && failed.zero? && expired.zero?

  write_stats(processed, failed, expired)
rescue StandardError => e
  # Replay-safety: counters were reset above, so a Redis blip would
  # otherwise drop stats. We log and accept — the per-job at-least-once
  # semantics don't apply to *counters*, and the next beat resets again.
  handle_exception(e, { context: 'flush_stats' })
end

#heartbeatObject

One-shot beat. Public for embedded mode (and for tests) — the heartbeat thread calls this on ‘BEAT_PAUSE` cadence.



131
132
133
134
# File 'lib/wurk/launcher.rb', line 131

def heartbeat
  flush_stats
  beat
end

#quietObject

Idempotent. Flips ‘stopping?` true, halts fetching across every Manager + the poller, then fires the `:quiet` event in reverse registration order so teardown hooks run LIFO.



92
93
94
95
96
97
98
99
100
101
102
# File 'lib/wurk/launcher.rb', line 92

def quiet
  return if @done

  @done = true
  @managers.each(&:quiet)
  @poller&.terminate
  # The cron poller is intentionally NOT terminated here: a USR1-quieted
  # leader still enqueues periodic jobs — it only stops fetching for itself.
  # Loops stop only on full shutdown (#stop). Spec: sidekiq-ent.md §2.6.
  fire_event(:quiet, reverse: true)
end

#run(async_beat: true) ⇒ Object

Boot order matters:

1. freeze! the config so mutations after fork are visible mistakes.
2. spawn the heartbeat thread BEFORE the managers so the dashboard
   sees the process the moment it can pick up jobs.
3. start the scheduler poller + the cron poller (both leader-gated for
   what they enqueue; safe to start before leadership is settled since
   a non-leader tick just returns early).
4. start the managers (which start their processors).
5. start the health probe server LAST so the listener doesn't
   accept k8s probes until the rest of the launcher is up.


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/wurk/launcher.rb', line 72

def run(async_beat: true)
  @started_at = Time.now.to_f
  # Default each capsule's fetcher + materialize its lazy pools/middleware
  # before the config freezes. Every entry point (swarm child, standalone
  # CLI, embedded) runs through here, so none boots with a nil fetcher.
  @config.capsules.each_value(&:prepare!)
  @config.freeze!
  @heartbeat_thread = safe_thread('heartbeat', &method(:start_heartbeat)) if async_beat
  @poller&.start
  @leader&.start
  @cron_poller&.start
  @metrics_rollup&.start
  @managers.each(&:start)
  @reaper.start
  @health_server&.start
end

#stopObject

Graceful shutdown. Deadline is monotonic so wall-clock skew can’t extend it. Managers stop in parallel threads so a slow capsule doesn’t block its siblings.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/wurk/launcher.rb', line 107

def stop
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + (@config[:timeout] || 25)
  quiet
  stoppers = @managers.map { |m| Thread.new { m.stop(deadline) } }
  fire_event(:shutdown, reverse: true)
  stoppers.each(&:join)
  # Full shutdown stops periodic firing (it survived #quiet); do this before
  # releasing the lock so no tick races a follower's promotion.
  @cron_poller&.terminate
  @metrics_rollup&.terminate
  @reaper&.stop
  # CAS-release the cluster lock now (planned shutdown) so a follower can
  # take over immediately instead of waiting out the TTL.
  @leader&.stop
  clear_heartbeat
  fire_event(:exit, reverse: true)
end

#stopping?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/wurk/launcher.rb', line 125

def stopping?
  @done
end