Module: Wurk::Middleware::PoisonPill

Defined in:
lib/wurk/middleware/poison_pill.rb

Overview

Pro parity (§3.2): poison-pill detection for reliable-fetch orphans. When a job is recovered out of a dead process’s private list, we INCR a per-jid counter at ‘super_fetch:recovered:<jid>` with a 72h TTL. Once the counter crosses RECOVERY_THRESHOLD (3) the next recovery is treated as a poison pill: the payload is moved to the dead set, `jobs.poison` is emitted to statsd, and recovery callbacks fire so operators can be paged.

Counter key TTL is wire-compat with Sidekiq Pro — third-party tooling that watches ‘super_fetch:recovered:*` expects 72h.

No server-middleware registration: callers are reaper / bulk_requeue paths that drive the lifecycle directly via ‘track!(payload, queue:)`. When that integration lands, it just calls `PoisonPill.track!` on each orphan it about to RPUSH back to the public queue.

Defined Under Namespace

Classes: Pill

Constant Summary collapse

RECOVERY_THRESHOLD =
3
RECOVERY_TTL =
72 * 60 * 60
KEY_PREFIX =
'super_fetch:recovered:'
DEAD_RECORD_LIMIT =
100

Class Method Summary collapse

Class Method Details

.bump_counter(jid) ⇒ Object



127
128
129
130
131
132
133
134
# File 'lib/wurk/middleware/poison_pill.rb', line 127

def bump_counter(jid)
  key = "#{KEY_PREFIX}#{jid}"
  Wurk.redis do |conn|
    count = conn.call('INCR', key).to_i
    conn.call('EXPIRE', key, RECOVERY_TTL)
    count
  end
end

.callbacksObject



104
105
106
# File 'lib/wurk/middleware/poison_pill.rb', line 104

def callbacks
  @callbacks ||= []
end

.clear!(jid) ⇒ Object

Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn’t accumulate state.



88
89
90
91
92
# File 'lib/wurk/middleware/poison_pill.rb', line 88

def clear!(jid)
  return if jid.nil? || jid.to_s.empty?

  Wurk.redis { |conn| conn.call('DEL', "#{KEY_PREFIX}#{jid}") }
end

.emit_poison(klass, queue) ⇒ Object



150
151
152
153
154
155
# File 'lib/wurk/middleware/poison_pill.rb', line 150

def emit_poison(klass, queue)
  tags = []
  tags << "class:#{klass}" if klass
  tags << "queue:#{queue}" if queue
  Wurk::Metrics::Statsd.increment('jobs.poison', tags: tags.empty? ? nil : tags)
end

.emit_recovered_fetch(klass, queue) ⇒ Object



136
137
138
139
140
141
# File 'lib/wurk/middleware/poison_pill.rb', line 136

def emit_recovered_fetch(klass, queue)
  tags = []
  tags << "class:#{klass}" if klass
  tags << "queue:#{queue}" if queue
  Wurk::Metrics::Statsd.increment('jobs.recovered.fetch', tags: tags.empty? ? nil : tags)
end

.fire_callbacks(pill) ⇒ Object



157
158
159
160
161
162
163
# File 'lib/wurk/middleware/poison_pill.rb', line 157

def fire_callbacks(pill)
  callbacks.each do |cb|
    cb.call(pill)
  rescue StandardError => e
    Wurk.configuration.handle_exception(e, context: 'Wurk::Middleware::PoisonPill')
  end
end

.fire_super_fetch(config, payload, pill) ⇒ Object

Invoke the Pro recovery callback registered via config.super_fetch! { }. No-op unless one is registered. ‘jobstr` is the raw job JSON so a Pro `|jobstr, pill|` block sees exactly what Sidekiq Pro hands it.



168
169
170
171
172
173
174
175
176
# File 'lib/wurk/middleware/poison_pill.rb', line 168

def fire_super_fetch(config, payload, pill)
  cb = config.super_fetch_callback
  return unless cb

  jobstr = payload.is_a?(::String) ? payload : Wurk.dump_json(payload)
  cb.call(jobstr, pill)
rescue StandardError => e
  config.handle_exception(e, context: 'Wurk::Middleware::PoisonPill')
end

.mark_poison(payload, job, queue:, count:) ⇒ Object



143
144
145
146
147
148
# File 'lib/wurk/middleware/poison_pill.rb', line 143

def mark_poison(payload, job, queue:, count:)
  emit_poison(job['class'], queue)
  json = payload.is_a?(String) ? payload : Wurk.dump_json(job)
  Wurk::DeadSet.new.kill(json, notify_failure: false)
  fire_callbacks(jid: job['jid'], klass: job['class'], count: count, queue: queue)
end

.on_poison(&block) ⇒ Object

Register a callback fired when a poison pill is detected. Callbacks receive a single Hash ‘klass:, count:, queue:` — matches Sidekiq Pro’s documented shape so consumers can drop in unchanged.

Raises:

  • (ArgumentError)


97
98
99
100
101
102
# File 'lib/wurk/middleware/poison_pill.rb', line 97

def on_poison(&block)
  raise ArgumentError, 'block required' unless block

  callbacks << block
  block
end

.parse(payload) ⇒ Object

—- internals ————————————————–



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/wurk/middleware/poison_pill.rb', line 115

def parse(payload)
  case payload
  when Hash then payload
  when String
    begin
      Wurk.load_json(payload)
    rescue ::JSON::ParserError
      nil
    end
  end
end

.recovery_count(jid) ⇒ Object

Reads the current recovery counter without bumping it. Used by tests and dashboards; returns 0 for jobs that have never been recovered.



80
81
82
83
84
# File 'lib/wurk/middleware/poison_pill.rb', line 80

def recovery_count(jid)
  return 0 if jid.nil? || jid.to_s.empty?

  Wurk.redis { |conn| conn.call('GET', "#{KEY_PREFIX}#{jid}") }.to_i
end

.reset!Object

Test-only reset.



109
110
111
# File 'lib/wurk/middleware/poison_pill.rb', line 109

def reset!
  @callbacks = []
end

.track!(payload, queue: nil, config: Wurk.configuration) ⇒ Symbol

Called per recovered orphan job. Returns ‘:poison` when the threshold was crossed and the job was killed; `:recovered` when the job is being re-pushed (caller’s responsibility — we don’t touch the queue here). Emits ‘jobs.recovered.fetch` on every call, `jobs.poison` only on the kill path.

Fires the Pro ‘super_fetch!` recovery callback (config.super_fetch_callback) exactly once per call: `(jobstr, nil)` on plain recovery, `(jobstr, pill)` on the kill path. The poison-only `on_poison` Hash callbacks fire independently inside #mark_poison.

Parameters:

  • payload (String, Hash)

    the job JSON or pre-parsed hash.

  • queue (String, nil) (defaults to: nil)

    the public queue name (without ‘queue:`).

  • config (Configuration) (defaults to: Wurk.configuration)

    config that owns the super_fetch! callback; defaults to the global so non-reaper callers (tests) need not pass it.

Returns:

  • (Symbol)

    :recovered | :poison



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/wurk/middleware/poison_pill.rb', line 56

def track!(payload, queue: nil, config: Wurk.configuration)
  job = parse(payload)
  unless job
    fire_super_fetch(config, payload, nil)
    return :recovered
  end

  jid = job['jid']
  klass = job['class']
  emit_recovered_fetch(klass, queue)

  count = bump_counter(jid) if jid && !jid.empty?
  if count && count >= RECOVERY_THRESHOLD
    mark_poison(payload, job, queue: queue, count: count)
    fire_super_fetch(config, payload, Pill.new(jid: jid, klass: klass, count: count, queue: queue))
    :poison
  else
    fire_super_fetch(config, payload, nil)
    :recovered
  end
end