Class: Wurk::Fetcher::Reaper

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/fetcher/reaper.rb

Overview

Orphan reclamation for the reliable fetcher (Pro super_fetch §3.2).

The Reliable fetcher moves each job from a public queue into a per-process private list (‘queue:<public>|<host>|<pid>|<idx>`) and leaves it there until the Processor ACKs. A SIGKILLed or crashed worker therefore strands its in-flight jobs in private lists that nobody will ever ACK. The Reaper is the recovery half: it periodically scans for private lists whose owning process is gone and atomically moves their jobs back to the public queue so a live worker re-runs them.

Liveness is decided per owner:

* same host — the OS is authoritative: `Process.kill(0, pid)`. This
  is instant and ignores a stale `processes` SET entry whose 60s TTL
  hasn't lapsed yet, so a `kill -9`ed sibling is reclaimed the moment
  the supervisor reaps it rather than 60s later. (Pid reuse by an
  unrelated local process is the one blind spot — the supervisor
  respawns with a fresh pid, so it does not arise in practice.)
* other host — we cannot ping the pid, so we trust the heartbeat:
  the owner is alive iff some live `processes` member (one whose
  `info` hash still exists) shares its `host:pid`. Cross-host reclaim
  therefore waits out the 60s heartbeat TTL, exactly as the spec says.

Re-pushed jobs run through Wurk::Middleware::PoisonPill, which caps a job at RECOVERY_THRESHOLD recoveries within 72h: past the cap the job is killed into the dead set instead of re-queued, so a job that crashes its worker every time can’t loop forever.

SCANs are scoped to the public queues this process serves and gated by a cluster-wide ‘SET NX EX` lock, so across a fleet only one process sweeps per interval (“1/min within process group” in the spec) and the keyspace touched is bounded to known queues.

Spec: docs/target/sidekiq-pro.md §3.2.

Constant Summary collapse

DEFAULT_INTERVAL =

Sweep cadence in seconds; also the cluster-lock TTL so exactly one process sweeps per interval. 60s matches the heartbeat TTL — the floor below which cross-host orphans can’t be detected anyway.

60
LOCK_KEY =
'super_fetch:reaper'
SCAN_COUNT =
100
THREAD_NAME =
'wurk-reaper'

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY) ⇒ Reaper

Returns a new instance of Reaper.



56
57
58
59
60
61
62
63
64
# File 'lib/wurk/fetcher/reaper.rb', line 56

def initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY)
  @config = config
  @interval = interval
  @lock_key = lock_key
  @thread = nil
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



54
55
56
# File 'lib/wurk/fetcher/reaper.rb', line 54

def interval
  @interval
end

Instance Method Details

#reapObject

One cluster-gated sweep: a no-op (returns 0) unless this process wins the interval’s lock. Used by the loop.



94
95
96
97
98
# File 'lib/wurk/fetcher/reaper.rb', line 94

def reap
  return 0 unless acquire_lock?

  reclaim!
end

#reclaim!Object

One unguarded sweep over every served queue. Returns the number of jobs reclaimed (re-queued or killed). Public so boot paths and tests can drive a deterministic pass without the cluster lock.



103
104
105
106
# File 'lib/wurk/fetcher/reaper.rb', line 103

def reclaim!
  prefixes = live_process_prefixes
  served_queues.sum { |public_q| reclaim_queue(public_q, prefixes) }
end

#running?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/wurk/fetcher/reaper.rb', line 88

def running?
  !@thread.nil? && @thread.alive?
end

#startObject

Spawns the sweep loop. Idempotent. The loop waits one interval before its first sweep so booting processes don’t dogpile Redis and so an un-stopped launcher in a unit test never touches the keyspace.



69
70
71
72
73
74
75
76
77
# File 'lib/wurk/fetcher/reaper.rb', line 69

def start
  @mutex.synchronize do
    return @thread if @thread

    @done = false
    @thread = spawn_loop_thread
  end
  @thread
end

#stopObject



79
80
81
82
83
84
85
86
# File 'lib/wurk/fetcher/reaper.rb', line 79

def stop
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
  @thread&.join
  @thread = nil
end