Module: Wurk::Middleware::PoisonPill

Defined in:
lib/wurk/middleware/poison_pill.rb

Overview

Pro parity (§3.2): poison-pill detection for reliable-fetch orphans. When a job is recovered out of a dead process’s private list, we INCR a per-jid counter at ‘super_fetch:recovered:<jid>` with a 72h TTL. Once the counter crosses RECOVERY_THRESHOLD (3) the next recovery is treated as a poison pill: the payload is moved to the dead set, `jobs.poison` is emitted to statsd, and recovery callbacks fire so operators can be paged.

Counter key TTL is wire-compat with Sidekiq Pro — third-party tooling that watches ‘super_fetch:recovered:*` expects 72h.

No server-middleware registration: callers are reaper / bulk_requeue paths that drive the lifecycle directly via ‘track!(payload, queue:)`. When that integration lands, it just calls `PoisonPill.track!` on each orphan it about to RPUSH back to the public queue.

Constant Summary collapse

RECOVERY_THRESHOLD =
3
RECOVERY_TTL =
72 * 60 * 60
KEY_PREFIX =
'super_fetch:recovered:'
DEAD_RECORD_LIMIT =
100

Class Method Summary collapse

Class Method Details

.bump_counter(jid) ⇒ Object



110
111
112
113
114
115
116
117
# File 'lib/wurk/middleware/poison_pill.rb', line 110

def bump_counter(jid)
  key = "#{KEY_PREFIX}#{jid}"
  Wurk.redis do |conn|
    count = conn.call('INCR', key).to_i
    conn.call('EXPIRE', key, RECOVERY_TTL)
    count
  end
end

.callbacksObject



87
88
89
# File 'lib/wurk/middleware/poison_pill.rb', line 87

def callbacks
  @callbacks ||= []
end

.clear!(jid) ⇒ Object

Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn’t accumulate state.



71
72
73
74
75
# File 'lib/wurk/middleware/poison_pill.rb', line 71

def clear!(jid)
  return if jid.nil? || jid.to_s.empty?

  Wurk.redis { |conn| conn.call('DEL', "#{KEY_PREFIX}#{jid}") }
end

.emit_poison(klass, queue) ⇒ Object



133
134
135
136
137
138
# File 'lib/wurk/middleware/poison_pill.rb', line 133

def emit_poison(klass, queue)
  tags = []
  tags << "class:#{klass}" if klass
  tags << "queue:#{queue}" if queue
  Wurk::Metrics::Statsd.increment('jobs.poison', tags: tags.empty? ? nil : tags)
end

.emit_recovered_fetch(klass, queue) ⇒ Object



119
120
121
122
123
124
# File 'lib/wurk/middleware/poison_pill.rb', line 119

def emit_recovered_fetch(klass, queue)
  tags = []
  tags << "class:#{klass}" if klass
  tags << "queue:#{queue}" if queue
  Wurk::Metrics::Statsd.increment('jobs.recovered.fetch', tags: tags.empty? ? nil : tags)
end

.fire_callbacks(pill) ⇒ Object



140
141
142
143
144
145
146
# File 'lib/wurk/middleware/poison_pill.rb', line 140

def fire_callbacks(pill)
  callbacks.each do |cb|
    cb.call(pill)
  rescue StandardError => e
    Wurk.configuration.handle_exception(e, context: 'Wurk::Middleware::PoisonPill')
  end
end

.mark_poison(payload, job, queue:, count:) ⇒ Object



126
127
128
129
130
131
# File 'lib/wurk/middleware/poison_pill.rb', line 126

def mark_poison(payload, job, queue:, count:)
  emit_poison(job['class'], queue)
  json = payload.is_a?(String) ? payload : Wurk.dump_json(job)
  Wurk::DeadSet.new.kill(json, notify_failure: false)
  fire_callbacks(jid: job['jid'], klass: job['class'], count: count, queue: queue)
end

.on_poison(&block) ⇒ Object

Register a callback fired when a poison pill is detected. Callbacks receive a single Hash ‘klass:, count:, queue:` — matches Sidekiq Pro’s documented shape so consumers can drop in unchanged.

Raises:

  • (ArgumentError)


80
81
82
83
84
85
# File 'lib/wurk/middleware/poison_pill.rb', line 80

def on_poison(&block)
  raise ArgumentError, 'block required' unless block

  callbacks << block
  block
end

.parse(payload) ⇒ Object

—- internals ————————————————–



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/wurk/middleware/poison_pill.rb', line 98

def parse(payload)
  case payload
  when Hash then payload
  when String
    begin
      Wurk.load_json(payload)
    rescue ::JSON::ParserError
      nil
    end
  end
end

.recovery_count(jid) ⇒ Object

Reads the current recovery counter without bumping it. Used by tests and dashboards; returns 0 for jobs that have never been recovered.



63
64
65
66
67
# File 'lib/wurk/middleware/poison_pill.rb', line 63

def recovery_count(jid)
  return 0 if jid.nil? || jid.to_s.empty?

  Wurk.redis { |conn| conn.call('GET', "#{KEY_PREFIX}#{jid}") }.to_i
end

.reset!Object

Test-only reset.



92
93
94
# File 'lib/wurk/middleware/poison_pill.rb', line 92

def reset!
  @callbacks = []
end

.track!(payload, queue: nil) ⇒ Symbol

Called per recovered orphan job. Returns ‘:poison` when the threshold was crossed and the job was killed; `:recovered` when the job is being re-pushed (caller’s responsibility — we don’t touch the queue here). Emits ‘jobs.recovered.fetch` on every call, `jobs.poison` only on the kill path.

Parameters:

  • payload (String, Hash)

    the job JSON or pre-parsed hash.

  • queue (String, nil) (defaults to: nil)

    the public queue name (without ‘queue:`).

Returns:

  • (Symbol)

    :recovered | :poison



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/wurk/middleware/poison_pill.rb', line 43

def track!(payload, queue: nil)
  job = parse(payload)
  return :recovered unless job

  jid = job['jid']
  klass = job['class']
  emit_recovered_fetch(klass, queue)
  return :recovered if jid.nil? || jid.empty?

  count = bump_counter(jid)
  if count >= RECOVERY_THRESHOLD
    mark_poison(payload, job, queue: queue, count: count)
    :poison
  else
    :recovered
  end
end