Class: Wurk::Fetcher::Reaper
- Inherits:
-
Object
- Object
- Wurk::Fetcher::Reaper
- Includes:
- Component
- Defined in:
- lib/wurk/fetcher/reaper.rb
Overview
Orphan reclamation for the reliable fetcher (Pro super_fetch §3.2).
The Reliable fetcher moves each job from a public queue into a per-process private list (‘queue:<public>|<host>|<pid>|<idx>`) and leaves it there until the Processor ACKs. A SIGKILLed or crashed worker therefore strands its in-flight jobs in private lists that nobody will ever ACK. The Reaper is the recovery half: it periodically scans for private lists whose owning process is gone and atomically moves their jobs back to the public queue so a live worker re-runs them.
Liveness is decided per owner:
* same host — the OS is authoritative: `Process.kill(0, pid)`. This
is instant and ignores a stale `processes` SET entry whose 60s TTL
hasn't lapsed yet, so a `kill -9`ed sibling is reclaimed the moment
the supervisor reaps it rather than 60s later. (Pid reuse by an
unrelated local process is the one blind spot — the supervisor
respawns with a fresh pid, so it does not arise in practice.)
* other host — we cannot ping the pid, so we trust the heartbeat:
the owner is alive iff some live `processes` member (one whose
`info` hash still exists) shares its `host:pid`. Cross-host reclaim
therefore waits out the 60s heartbeat TTL, exactly as the spec says.
Re-pushed jobs run through Wurk::Middleware::PoisonPill, which caps a job at RECOVERY_THRESHOLD recoveries within 72h: past the cap the job is killed into the dead set instead of re-queued, so a job that crashes its worker every time can’t loop forever.
The reaper runs two passes, exactly as super_fetch’s sweeper does:
* a *scoped* sweep every interval ("1/min within process group"): SCANs
only the public queues this process serves, gated by a cluster `SET NX
EX` lock so one process sweeps per interval. The cheap common path.
* a *full* sweep at most once an hour ("full SCAN 1/hr"): SCANs the whole
`queue:*|*` keyspace, gated by its own hourly lock, so private lists
whose public queue no live process serves — a renamed/decommissioned
queue, or a dead host's queue no survivor consumes — are recovered too,
not stranded forever.
Spec: docs/target/sidekiq-pro.md §3.2.
Constant Summary collapse
- DEFAULT_INTERVAL =
Sweep cadence in seconds; also the cluster-lock TTL so exactly one process sweeps per interval. 60s matches the heartbeat TTL — the floor below which cross-host orphans can’t be detected anyway.
60- FULL_INTERVAL =
Full-keyspace sweep cadence + its lock TTL: at most once per hour across the fleet, since a global SCAN is far costlier than the scoped pass.
3600- LOCK_KEY =
'super_fetch:reaper'- FULL_LOCK_KEY =
'super_fetch:reaper:full'- SCAN_COUNT =
100- THREAD_NAME =
'wurk-reaper'
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
Attributes included from Component
Instance Method Summary collapse
-
#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) ⇒ Reaper
constructor
A new instance of Reaper.
-
#reap ⇒ Object
One loop tick: the scoped sweep when this process wins the per-interval lock, plus the full-keyspace sweep when it also wins the hourly lock.
-
#reclaim! ⇒ Object
One unguarded sweep over every served queue.
-
#reclaim_full! ⇒ Object
One unguarded full-keyspace sweep: every ‘queue:*|*` private list, even ones whose public queue this process doesn’t serve.
- #running? ⇒ Boolean
-
#start ⇒ Object
Spawns the sweep loop.
- #stop ⇒ Object
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) ⇒ Reaper
Returns a new instance of Reaper.
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/wurk/fetcher/reaper.rb', line 67 def initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) @config = config @interval = interval @lock_key = lock_key @full_interval = full_interval @full_lock_key = full_lock_key @thread = nil @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new end |
Instance Attribute Details
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
65 66 67 |
# File 'lib/wurk/fetcher/reaper.rb', line 65 def interval @interval end |
Instance Method Details
#reap ⇒ Object
One loop tick: the scoped sweep when this process wins the per-interval lock, plus the full-keyspace sweep when it also wins the hourly lock. Returns the total jobs reclaimed across both.
109 110 111 112 113 |
# File 'lib/wurk/fetcher/reaper.rb', line 109 def reap reclaimed = acquire_lock? ? reclaim! : 0 reclaimed += reclaim_full! if acquire_full_lock? reclaimed end |
#reclaim! ⇒ Object
One unguarded sweep over every served queue. Returns the number of jobs reclaimed (re-queued or killed). Public so boot paths and tests can drive a deterministic pass without the cluster lock.
118 119 120 121 |
# File 'lib/wurk/fetcher/reaper.rb', line 118 def reclaim! prefixes = live_process_prefixes served_queues.sum { |public_q| reclaim_queue(public_q, prefixes) } end |
#reclaim_full! ⇒ Object
One unguarded full-keyspace sweep: every ‘queue:*|*` private list, even ones whose public queue this process doesn’t serve. Returns the number of jobs reclaimed. Public so boot paths and tests can drive it without the hourly lock.
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/wurk/fetcher/reaper.rb', line 127 def reclaim_full! prefixes = live_process_prefixes reclaimed = 0 each_full_private_list do |key, public_q, host, pid| next if owner_alive?(host, pid, prefixes) reclaimed += drain(key, public_q) end reclaimed end |
#running? ⇒ Boolean
102 103 104 |
# File 'lib/wurk/fetcher/reaper.rb', line 102 def running? !@thread.nil? && @thread.alive? end |
#start ⇒ Object
Spawns the sweep loop. Idempotent. The loop waits one interval before its first sweep so booting processes don’t dogpile Redis and so an un-stopped launcher in a unit test never touches the keyspace.
83 84 85 86 87 88 89 90 91 |
# File 'lib/wurk/fetcher/reaper.rb', line 83 def start @mutex.synchronize do return @thread if @thread @done = false @thread = spawn_loop_thread end @thread end |
#stop ⇒ Object
93 94 95 96 97 98 99 100 |
# File 'lib/wurk/fetcher/reaper.rb', line 93 def stop @mutex.synchronize do @done = true @sleeper.signal end @thread&.join @thread = nil end |