Module: Wurk::Middleware::PoisonPill
- Defined in:
- lib/wurk/middleware/poison_pill.rb
Overview
Pro parity (§3.2): poison-pill detection for reliable-fetch orphans. When a job is recovered out of a dead process’s private list, we INCR a per-jid counter at ‘super_fetch:recovered:<jid>` with a 72h TTL. Once the counter crosses RECOVERY_THRESHOLD (3) the next recovery is treated as a poison pill: the payload is moved to the dead set, `jobs.poison` is emitted to statsd, and recovery callbacks fire so operators can be paged.
Counter key TTL is wire-compat with Sidekiq Pro — third-party tooling that watches ‘super_fetch:recovered:*` expects 72h.
No server-middleware registration: callers are reaper / bulk_requeue paths that drive the lifecycle directly via ‘track!(payload, queue:)`. When that integration lands, it just calls `PoisonPill.track!` on each orphan it about to RPUSH back to the public queue.
Defined Under Namespace
Classes: Pill
Constant Summary collapse
- RECOVERY_THRESHOLD =
3- RECOVERY_TTL =
72 * 60 * 60
- KEY_PREFIX =
'super_fetch:recovered:'- DEAD_RECORD_LIMIT =
100
Class Method Summary collapse
- .bump_counter(jid) ⇒ Object
- .callbacks ⇒ Object
-
.clear!(jid) ⇒ Object
Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn’t accumulate state.
- .emit_poison(klass, queue) ⇒ Object
- .emit_recovered_fetch(klass, queue) ⇒ Object
- .fire_callbacks(pill) ⇒ Object
-
.fire_super_fetch(config, payload, pill) ⇒ Object
Invoke the Pro recovery callback registered via config.super_fetch! { }.
- .mark_poison(payload, job, queue:, count:) ⇒ Object
-
.on_poison(&block) ⇒ Object
Register a callback fired when a poison pill is detected.
-
.parse(payload) ⇒ Object
—- internals ————————————————–.
-
.recovery_count(jid) ⇒ Object
Reads the current recovery counter without bumping it.
-
.reset! ⇒ Object
Test-only reset.
-
.track!(payload, queue: nil, config: Wurk.configuration) ⇒ Symbol
Called per recovered orphan job.
Class Method Details
.bump_counter(jid) ⇒ Object
127 128 129 130 131 132 133 134 |
# File 'lib/wurk/middleware/poison_pill.rb', line 127 def bump_counter(jid) key = "#{KEY_PREFIX}#{jid}" Wurk.redis do |conn| count = conn.call('INCR', key).to_i conn.call('EXPIRE', key, RECOVERY_TTL) count end end |
.callbacks ⇒ Object
104 105 106 |
# File 'lib/wurk/middleware/poison_pill.rb', line 104 def callbacks @callbacks ||= [] end |
.clear!(jid) ⇒ Object
Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn’t accumulate state.
88 89 90 91 92 |
# File 'lib/wurk/middleware/poison_pill.rb', line 88 def clear!(jid) return if jid.nil? || jid.to_s.empty? Wurk.redis { |conn| conn.call('DEL', "#{KEY_PREFIX}#{jid}") } end |
.emit_poison(klass, queue) ⇒ Object
150 151 152 153 154 155 |
# File 'lib/wurk/middleware/poison_pill.rb', line 150 def emit_poison(klass, queue) = [] << "class:#{klass}" if klass << "queue:#{queue}" if queue Wurk::Metrics::Statsd.increment('jobs.poison', tags: .empty? ? nil : ) end |
.emit_recovered_fetch(klass, queue) ⇒ Object
136 137 138 139 140 141 |
# File 'lib/wurk/middleware/poison_pill.rb', line 136 def emit_recovered_fetch(klass, queue) = [] << "class:#{klass}" if klass << "queue:#{queue}" if queue Wurk::Metrics::Statsd.increment('jobs.recovered.fetch', tags: .empty? ? nil : ) end |
.fire_callbacks(pill) ⇒ Object
157 158 159 160 161 162 163 |
# File 'lib/wurk/middleware/poison_pill.rb', line 157 def fire_callbacks(pill) callbacks.each do |cb| cb.call(pill) rescue StandardError => e Wurk.configuration.handle_exception(e, context: 'Wurk::Middleware::PoisonPill') end end |
.fire_super_fetch(config, payload, pill) ⇒ Object
Invoke the Pro recovery callback registered via config.super_fetch! { }. No-op unless one is registered. ‘jobstr` is the raw job JSON so a Pro `|jobstr, pill|` block sees exactly what Sidekiq Pro hands it.
168 169 170 171 172 173 174 175 176 |
# File 'lib/wurk/middleware/poison_pill.rb', line 168 def fire_super_fetch(config, payload, pill) cb = config.super_fetch_callback return unless cb jobstr = payload.is_a?(::String) ? payload : Wurk.dump_json(payload) cb.call(jobstr, pill) rescue StandardError => e config.handle_exception(e, context: 'Wurk::Middleware::PoisonPill') end |
.mark_poison(payload, job, queue:, count:) ⇒ Object
143 144 145 146 147 148 |
# File 'lib/wurk/middleware/poison_pill.rb', line 143 def mark_poison(payload, job, queue:, count:) emit_poison(job['class'], queue) json = payload.is_a?(String) ? payload : Wurk.dump_json(job) Wurk::DeadSet.new.kill(json, notify_failure: false) fire_callbacks(jid: job['jid'], klass: job['class'], count: count, queue: queue) end |
.on_poison(&block) ⇒ Object
Register a callback fired when a poison pill is detected. Callbacks receive a single Hash ‘klass:, count:, queue:` — matches Sidekiq Pro’s documented shape so consumers can drop in unchanged.
97 98 99 100 101 102 |
# File 'lib/wurk/middleware/poison_pill.rb', line 97 def on_poison(&block) raise ArgumentError, 'block required' unless block callbacks << block block end |
.parse(payload) ⇒ Object
—- internals ————————————————–
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/wurk/middleware/poison_pill.rb', line 115 def parse(payload) case payload when Hash then payload when String begin Wurk.load_json(payload) rescue ::JSON::ParserError nil end end end |
.recovery_count(jid) ⇒ Object
Reads the current recovery counter without bumping it. Used by tests and dashboards; returns 0 for jobs that have never been recovered.
80 81 82 83 84 |
# File 'lib/wurk/middleware/poison_pill.rb', line 80 def recovery_count(jid) return 0 if jid.nil? || jid.to_s.empty? Wurk.redis { |conn| conn.call('GET', "#{KEY_PREFIX}#{jid}") }.to_i end |
.reset! ⇒ Object
Test-only reset.
109 110 111 |
# File 'lib/wurk/middleware/poison_pill.rb', line 109 def reset! @callbacks = [] end |
.track!(payload, queue: nil, config: Wurk.configuration) ⇒ Symbol
Called per recovered orphan job. Returns ‘:poison` when the threshold was crossed and the job was killed; `:recovered` when the job is being re-pushed (caller’s responsibility — we don’t touch the queue here). Emits ‘jobs.recovered.fetch` on every call, `jobs.poison` only on the kill path.
Fires the Pro ‘super_fetch!` recovery callback (config.super_fetch_callback) exactly once per call: `(jobstr, nil)` on plain recovery, `(jobstr, pill)` on the kill path. The poison-only `on_poison` Hash callbacks fire independently inside #mark_poison.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/wurk/middleware/poison_pill.rb', line 56 def track!(payload, queue: nil, config: Wurk.configuration) job = parse(payload) unless job fire_super_fetch(config, payload, nil) return :recovered end jid = job['jid'] klass = job['class'] emit_recovered_fetch(klass, queue) count = bump_counter(jid) if jid && !jid.empty? if count && count >= RECOVERY_THRESHOLD mark_poison(payload, job, queue: queue, count: count) fire_super_fetch(config, payload, Pill.new(jid: jid, klass: klass, count: count, queue: queue)) :poison else fire_super_fetch(config, payload, nil) :recovered end end |