Class: Wurk::Fetcher::Reaper

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/fetcher/reaper.rb

Overview

Orphan reclamation for the reliable fetcher (Pro super_fetch §3.2).

The Reliable fetcher moves each job from a public queue into a per-process private list (‘queue:<public>|<host>|<pid>|<idx>`) and leaves it there until the Processor ACKs. A SIGKILLed or crashed worker therefore strands its in-flight jobs in private lists that nobody will ever ACK. The Reaper is the recovery half: it periodically scans for private lists whose owning process is gone and atomically moves their jobs back to the public queue so a live worker re-runs them.

Liveness is decided per owner:

* same host — the OS is authoritative: `Process.kill(0, pid)`. This
  is instant and ignores a stale `processes` SET entry whose 60s TTL
  hasn't lapsed yet, so a `kill -9`ed sibling is reclaimed the moment
  the supervisor reaps it rather than 60s later. (Pid reuse by an
  unrelated local process is the one blind spot — the supervisor
  respawns with a fresh pid, so it does not arise in practice.)
* other host — we cannot ping the pid, so we trust the heartbeat:
  the owner is alive iff some live `processes` member (one whose
  `info` hash still exists) shares its `host:pid`. Cross-host reclaim
  therefore waits out the 60s heartbeat TTL, exactly as the spec says.

Re-pushed jobs run through Wurk::Middleware::PoisonPill, which caps a job at RECOVERY_THRESHOLD recoveries within 72h: past the cap the job is killed into the dead set instead of re-queued, so a job that crashes its worker every time can’t loop forever.

The reaper runs two passes, exactly as super_fetch’s sweeper does:

* a *scoped* sweep every interval ("1/min within process group"): SCANs
  only the public queues this process serves, gated by a cluster `SET NX
  EX` lock so one process sweeps per interval. The cheap common path.
* a *full* sweep at most once an hour ("full SCAN 1/hr"): SCANs the whole
  `queue:*|*` keyspace, gated by its own hourly lock, so private lists
  whose public queue no live process serves — a renamed/decommissioned
  queue, or a dead host's queue no survivor consumes — are recovered too,
  not stranded forever.

Spec: docs/target/sidekiq-pro.md §3.2.

Constant Summary collapse

DEFAULT_INTERVAL =

Sweep cadence in seconds; also the cluster-lock TTL so exactly one process sweeps per interval. 60s matches the heartbeat TTL — the floor below which cross-host orphans can’t be detected anyway.

60
FULL_INTERVAL =

Full-keyspace sweep cadence + its lock TTL: at most once per hour across the fleet, since a global SCAN is far costlier than the scoped pass.

3600
LOCK_KEY =
'super_fetch:reaper'
FULL_LOCK_KEY =
'super_fetch:reaper:full'
SCAN_COUNT =
100
THREAD_NAME =
'wurk-reaper'

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary collapse

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) ⇒ Reaper

Returns a new instance of Reaper.



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/wurk/fetcher/reaper.rb', line 67

def initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY,
               full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY)
  @config = config
  @interval = interval
  @lock_key = lock_key
  @full_interval = full_interval
  @full_lock_key = full_lock_key
  @thread = nil
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
end

Instance Attribute Details

#intervalObject (readonly)

Returns the value of attribute interval.



65
66
67
# File 'lib/wurk/fetcher/reaper.rb', line 65

def interval
  @interval
end

Instance Method Details

#reapObject

One loop tick: the scoped sweep when this process wins the per-interval lock, plus the full-keyspace sweep when it also wins the hourly lock. Returns the total jobs reclaimed across both.



109
110
111
112
113
# File 'lib/wurk/fetcher/reaper.rb', line 109

def reap
  reclaimed = acquire_lock? ? reclaim! : 0
  reclaimed += reclaim_full! if acquire_full_lock?
  reclaimed
end

#reclaim!Object

One unguarded sweep over every served queue. Returns the number of jobs reclaimed (re-queued or killed). Public so boot paths and tests can drive a deterministic pass without the cluster lock.



118
119
120
121
# File 'lib/wurk/fetcher/reaper.rb', line 118

def reclaim!
  prefixes = live_process_prefixes
  served_queues.sum { |public_q| reclaim_queue(public_q, prefixes) }
end

#reclaim_full!Object

One unguarded full-keyspace sweep: every ‘queue:*|*` private list, even ones whose public queue this process doesn’t serve. Returns the number of jobs reclaimed. Public so boot paths and tests can drive it without the hourly lock.



127
128
129
130
131
132
133
134
135
136
# File 'lib/wurk/fetcher/reaper.rb', line 127

def reclaim_full!
  prefixes = live_process_prefixes
  reclaimed = 0
  each_full_private_list do |key, public_q, host, pid|
    next if owner_alive?(host, pid, prefixes)

    reclaimed += drain(key, public_q)
  end
  reclaimed
end

#running?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/wurk/fetcher/reaper.rb', line 102

def running?
  !@thread.nil? && @thread.alive?
end

#startObject

Spawns the sweep loop. Idempotent. The loop waits one interval before its first sweep so booting processes don’t dogpile Redis and so an un-stopped launcher in a unit test never touches the keyspace.



83
84
85
86
87
88
89
90
91
# File 'lib/wurk/fetcher/reaper.rb', line 83

def start
  @mutex.synchronize do
    return @thread if @thread

    @done = false
    @thread = spawn_loop_thread
  end
  @thread
end

#stopObject



93
94
95
96
97
98
99
100
# File 'lib/wurk/fetcher/reaper.rb', line 93

def stop
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
  @thread&.join
  @thread = nil
end