Class: Wurk::Fetcher::Reaper
- Inherits:
-
Object
- Object
- Wurk::Fetcher::Reaper
- Includes:
- Component
- Defined in:
- lib/wurk/fetcher/reaper.rb
Overview
Orphan reclamation for the reliable fetcher (Pro super_fetch §3.2).
The Reliable fetcher moves each job from a public queue into a per-process private list (‘queue:<public>|<host>|<pid>|<idx>`) and leaves it there until the Processor ACKs. A SIGKILLed or crashed worker therefore strands its in-flight jobs in private lists that nobody will ever ACK. The Reaper is the recovery half: it periodically scans for private lists whose owning process is gone and atomically moves their jobs back to the public queue so a live worker re-runs them.
Liveness is decided per owner:
* same host — the OS is authoritative: `Process.kill(0, pid)`. This
is instant and ignores a stale `processes` SET entry whose 60s TTL
hasn't lapsed yet, so a `kill -9`ed sibling is reclaimed the moment
the supervisor reaps it rather than 60s later. (Pid reuse by an
unrelated local process is the one blind spot — the supervisor
respawns with a fresh pid, so it does not arise in practice.)
* other host — we cannot ping the pid, so we trust the heartbeat:
the owner is alive iff some live `processes` member (one whose
`info` hash still exists) shares its `host:pid`. Cross-host reclaim
therefore waits out the 60s heartbeat TTL, exactly as the spec says.
Re-pushed jobs run through Wurk::Middleware::PoisonPill, which caps a job at RECOVERY_THRESHOLD recoveries within 72h: past the cap the job is killed into the dead set instead of re-queued, so a job that crashes its worker every time can’t loop forever.
SCANs are scoped to the public queues this process serves and gated by a cluster-wide ‘SET NX EX` lock, so across a fleet only one process sweeps per interval (“1/min within process group” in the spec) and the keyspace touched is bounded to known queues.
Spec: docs/target/sidekiq-pro.md §3.2.
Constant Summary collapse
- DEFAULT_INTERVAL =
Sweep cadence in seconds; also the cluster-lock TTL so exactly one process sweeps per interval. 60s matches the heartbeat TTL — the floor below which cross-host orphans can’t be detected anyway.
60- LOCK_KEY =
'super_fetch:reaper'- SCAN_COUNT =
100- THREAD_NAME =
'wurk-reaper'
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary collapse
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
Attributes included from Component
Instance Method Summary collapse
-
#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY) ⇒ Reaper
constructor
A new instance of Reaper.
-
#reap ⇒ Object
One cluster-gated sweep: a no-op (returns 0) unless this process wins the interval’s lock.
-
#reclaim! ⇒ Object
One unguarded sweep over every served queue.
- #running? ⇒ Boolean
-
#start ⇒ Object
Spawns the sweep loop.
- #stop ⇒ Object
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY) ⇒ Reaper
Returns a new instance of Reaper.
56 57 58 59 60 61 62 63 64 |
# File 'lib/wurk/fetcher/reaper.rb', line 56 def initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY) @config = config @interval = interval @lock_key = lock_key @thread = nil @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new end |
Instance Attribute Details
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
54 55 56 |
# File 'lib/wurk/fetcher/reaper.rb', line 54 def interval @interval end |
Instance Method Details
#reap ⇒ Object
One cluster-gated sweep: a no-op (returns 0) unless this process wins the interval’s lock. Used by the loop.
94 95 96 97 98 |
# File 'lib/wurk/fetcher/reaper.rb', line 94 def reap return 0 unless acquire_lock? reclaim! end |
#reclaim! ⇒ Object
One unguarded sweep over every served queue. Returns the number of jobs reclaimed (re-queued or killed). Public so boot paths and tests can drive a deterministic pass without the cluster lock.
103 104 105 106 |
# File 'lib/wurk/fetcher/reaper.rb', line 103 def reclaim! prefixes = live_process_prefixes served_queues.sum { |public_q| reclaim_queue(public_q, prefixes) } end |
#running? ⇒ Boolean
88 89 90 |
# File 'lib/wurk/fetcher/reaper.rb', line 88 def running? !@thread.nil? && @thread.alive? end |
#start ⇒ Object
Spawns the sweep loop. Idempotent. The loop waits one interval before its first sweep so booting processes don’t dogpile Redis and so an un-stopped launcher in a unit test never touches the keyspace.
69 70 71 72 73 74 75 76 77 |
# File 'lib/wurk/fetcher/reaper.rb', line 69 def start @mutex.synchronize do return @thread if @thread @done = false @thread = spawn_loop_thread end @thread end |
#stop ⇒ Object
79 80 81 82 83 84 85 86 |
# File 'lib/wurk/fetcher/reaper.rb', line 79 def stop @mutex.synchronize do @done = true @sleeper.signal end @thread&.join @thread = nil end |