Module: Wurk::Middleware::PoisonPill
- Defined in:
- lib/wurk/middleware/poison_pill.rb
Overview
Pro parity (§3.2): poison-pill detection for reliable-fetch orphans. When a job is recovered out of a dead process’s private list, we INCR a per-jid counter at ‘super_fetch:recovered:<jid>` with a 72h TTL. Once the counter crosses RECOVERY_THRESHOLD (3) the next recovery is treated as a poison pill: the payload is moved to the dead set, `jobs.poison` is emitted to statsd, and recovery callbacks fire so operators can be paged.
Counter key TTL is wire-compat with Sidekiq Pro — third-party tooling that watches ‘super_fetch:recovered:*` expects 72h.
No server-middleware registration: callers are reaper / bulk_requeue paths that drive the lifecycle directly via ‘track!(payload, queue:)`. When that integration lands, it just calls `PoisonPill.track!` on each orphan it about to RPUSH back to the public queue.
Constant Summary collapse
- RECOVERY_THRESHOLD =
3- RECOVERY_TTL =
72 * 60 * 60
- KEY_PREFIX =
'super_fetch:recovered:'- DEAD_RECORD_LIMIT =
100
Class Method Summary collapse
- .bump_counter(jid) ⇒ Object
- .callbacks ⇒ Object
-
.clear!(jid) ⇒ Object
Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn’t accumulate state.
- .emit_poison(klass, queue) ⇒ Object
- .emit_recovered_fetch(klass, queue) ⇒ Object
- .fire_callbacks(pill) ⇒ Object
- .mark_poison(payload, job, queue:, count:) ⇒ Object
-
.on_poison(&block) ⇒ Object
Register a callback fired when a poison pill is detected.
-
.parse(payload) ⇒ Object
—- internals ————————————————–.
-
.recovery_count(jid) ⇒ Object
Reads the current recovery counter without bumping it.
-
.reset! ⇒ Object
Test-only reset.
-
.track!(payload, queue: nil) ⇒ Symbol
Called per recovered orphan job.
Class Method Details
.bump_counter(jid) ⇒ Object
110 111 112 113 114 115 116 117 |
# File 'lib/wurk/middleware/poison_pill.rb', line 110 def bump_counter(jid) key = "#{KEY_PREFIX}#{jid}" Wurk.redis do |conn| count = conn.call('INCR', key).to_i conn.call('EXPIRE', key, RECOVERY_TTL) count end end |
.callbacks ⇒ Object
87 88 89 |
# File 'lib/wurk/middleware/poison_pill.rb', line 87 def callbacks @callbacks ||= [] end |
.clear!(jid) ⇒ Object
Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn’t accumulate state.
71 72 73 74 75 |
# File 'lib/wurk/middleware/poison_pill.rb', line 71 def clear!(jid) return if jid.nil? || jid.to_s.empty? Wurk.redis { |conn| conn.call('DEL', "#{KEY_PREFIX}#{jid}") } end |
.emit_poison(klass, queue) ⇒ Object
133 134 135 136 137 138 |
# File 'lib/wurk/middleware/poison_pill.rb', line 133 def emit_poison(klass, queue) = [] << "class:#{klass}" if klass << "queue:#{queue}" if queue Wurk::Metrics::Statsd.increment('jobs.poison', tags: .empty? ? nil : ) end |
.emit_recovered_fetch(klass, queue) ⇒ Object
119 120 121 122 123 124 |
# File 'lib/wurk/middleware/poison_pill.rb', line 119 def emit_recovered_fetch(klass, queue) = [] << "class:#{klass}" if klass << "queue:#{queue}" if queue Wurk::Metrics::Statsd.increment('jobs.recovered.fetch', tags: .empty? ? nil : ) end |
.fire_callbacks(pill) ⇒ Object
140 141 142 143 144 145 146 |
# File 'lib/wurk/middleware/poison_pill.rb', line 140 def fire_callbacks(pill) callbacks.each do |cb| cb.call(pill) rescue StandardError => e Wurk.configuration.handle_exception(e, context: 'Wurk::Middleware::PoisonPill') end end |
.mark_poison(payload, job, queue:, count:) ⇒ Object
126 127 128 129 130 131 |
# File 'lib/wurk/middleware/poison_pill.rb', line 126 def mark_poison(payload, job, queue:, count:) emit_poison(job['class'], queue) json = payload.is_a?(String) ? payload : Wurk.dump_json(job) Wurk::DeadSet.new.kill(json, notify_failure: false) fire_callbacks(jid: job['jid'], klass: job['class'], count: count, queue: queue) end |
.on_poison(&block) ⇒ Object
Register a callback fired when a poison pill is detected. Callbacks receive a single Hash ‘klass:, count:, queue:` — matches Sidekiq Pro’s documented shape so consumers can drop in unchanged.
80 81 82 83 84 85 |
# File 'lib/wurk/middleware/poison_pill.rb', line 80 def on_poison(&block) raise ArgumentError, 'block required' unless block callbacks << block block end |
.parse(payload) ⇒ Object
—- internals ————————————————–
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/wurk/middleware/poison_pill.rb', line 98 def parse(payload) case payload when Hash then payload when String begin Wurk.load_json(payload) rescue ::JSON::ParserError nil end end end |
.recovery_count(jid) ⇒ Object
Reads the current recovery counter without bumping it. Used by tests and dashboards; returns 0 for jobs that have never been recovered.
63 64 65 66 67 |
# File 'lib/wurk/middleware/poison_pill.rb', line 63 def recovery_count(jid) return 0 if jid.nil? || jid.to_s.empty? Wurk.redis { |conn| conn.call('GET', "#{KEY_PREFIX}#{jid}") }.to_i end |
.reset! ⇒ Object
Test-only reset.
92 93 94 |
# File 'lib/wurk/middleware/poison_pill.rb', line 92 def reset! @callbacks = [] end |
.track!(payload, queue: nil) ⇒ Symbol
Called per recovered orphan job. Returns ‘:poison` when the threshold was crossed and the job was killed; `:recovered` when the job is being re-pushed (caller’s responsibility — we don’t touch the queue here). Emits ‘jobs.recovered.fetch` on every call, `jobs.poison` only on the kill path.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/wurk/middleware/poison_pill.rb', line 43 def track!(payload, queue: nil) job = parse(payload) return :recovered unless job jid = job['jid'] klass = job['class'] emit_recovered_fetch(klass, queue) return :recovered if jid.nil? || jid.empty? count = bump_counter(jid) if count >= RECOVERY_THRESHOLD mark_poison(payload, job, queue: queue, count: count) :poison else :recovered end end |