Module: Wurk::Batch::Callbacks

Defined in:
lib/wurk/batch/callbacks.rb

Overview

Fires batch callbacks (‘:success`, `:complete`, `:death`) by enqueuing them as ordinary jobs on the batch’s ‘callback_queue`. Dedup is via b-<bid>-notify so the same callback can’t be enqueued twice even if multiple workers race to ack the final job.

Callback wrapper job: Wurk::Batch::CallbackJob — given a target spec (“Klass” or “Klass#method”) and options hash, it instantiates and invokes on_<event> (or the named method) with the Status snapshot.

Class Method Summary collapse

Class Method Details

.any_child_dead?(bid) ⇒ Boolean

Returns:

  • (Boolean)


188
189
190
191
# File 'lib/wurk/batch/callbacks.rb', line 188

def any_child_dead?(bid)
  kids = Wurk.redis { |conn| conn.call('SMEMBERS', "b-#{bid}-kids") }
  kids.any? { |kid| death_fired?(kid) }
end

.apply_linger(bid) ⇒ Object

Post-success retention: a succeeded batch no longer coordinates any jobs, so its keys expire after the per-batch ‘linger` override (else 24h) instead of the 30d pending TTL. Mirrors Sidekiq Pro §2.8.



111
112
113
114
115
116
117
# File 'lib/wurk/batch/callbacks.rb', line 111

def apply_linger(bid)
  raw     = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'linger') }
  seconds = raw.nil? || raw.to_s.empty? ? Batch::POST_SUCCESS_EXPIRY_SECONDS : raw.to_i
  Wurk.redis do |conn|
    Batch.keys_for(bid).each { |key| conn.call('EXPIRE', key, seconds) }
  end
end

.callback_specs_for(bid) ⇒ Object



208
209
210
211
212
213
214
# File 'lib/wurk/batch/callbacks.rb', line 208

def callback_specs_for(bid)
  raw = Wurk.redis { |conn| conn.call('HMGET', "b-#{bid}", 'callbacks', 'callback_queue') }
  callbacks_json, queue = raw
  queue = 'default' if queue.nil? || queue.empty?
  parsed = parse_callbacks(callbacks_json)
  [parsed, queue]
end

.cascade_death(bid) ⇒ Object

A child’s death means the parent — and every ancestor — can never fully succeed, so ‘:death` propagates up the parent chain. The recursion bottoms out at the root (empty parent_bid); fire_death’s own dedup_set guard makes each ancestor’s ‘:death` fire exactly once even under racing children.



66
67
68
69
70
71
# File 'lib/wurk/batch/callbacks.rb', line 66

def cascade_death(bid)
  parent_bid = parent_bid_for(bid)
  return if parent_bid.nil? || parent_bid.empty?

  fire_death(parent_bid)
end

.clear_death_on_recovery(bid) ⇒ Object

Recovery counterpart to cascade_death (#226). When a descendant’s last dead job is manually retried back to success, the descendant clears its OWN death mark (#212, in BATCH_PUSH) — but every ancestor was marked by the death cascade, not by a jid in its own died set, so nothing here ever cleared them and the ancestor’s ‘:success` stayed suppressed forever. Re-evaluate this batch: drop its durable death mark and `dead-batches` membership once its own died set is empty AND no child still carries a death mark. The `b-<bid>-death` notify dedup key is deliberately left intact, so a later re-death re-marks the batch (fire_death restores the flag before its own dedup guard) without ever re-enqueuing `:death`.



173
174
175
176
177
178
179
180
181
182
# File 'lib/wurk/batch/callbacks.rb', line 173

def clear_death_on_recovery(bid)
  return unless death_fired?(bid)
  return if own_died_remaining?(bid)
  return if any_child_dead?(bid)

  Wurk.redis do |conn|
    conn.call('HDEL', "b-#{bid}", 'death')
    conn.call('ZREM', 'dead-batches', bid)
  end
end

.death_fired?(bid) ⇒ Boolean

True once ‘:death` has fired for this batch — from one of its own jobs dying or from a descendant’s death cascading up. Suppresses ‘:success`, which must never fire after any death in the subtree.

Reads the durable ‘death` field on `b-<bid>` (written by `record_event`), not the `b-<bid>-death` dedup key — the dedup key has its own 30d TTL and can expire while an ancestor batch is still open, after which a late `maybe_fire` would wrongly emit `:success`.

Returns:

  • (Boolean)


145
146
147
# File 'lib/wurk/batch/callbacks.rb', line 145

def death_fired?(bid)
  Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'death') } == '1'
end

.dedup_set(bid, event) ⇒ Object

Atomically marks ‘bid` as having fired `event`. Returns true the first time, false thereafter — caller skips the enqueue when false. SET NX makes this safe under racing acks.



122
123
124
125
126
127
# File 'lib/wurk/batch/callbacks.rb', line 122

def dedup_set(bid, event)
  Wurk.redis do |conn|
    ok = conn.call('SET', "b-#{bid}-#{event}", '1', 'NX', 'EX', Batch::CALLBACK_NOTIFY_TTL)
    ok == 'OK'
  end
end

.emit_duration_metric(bid) ⇒ Object

Pro statsd metric (spec §9.3): wall-clock seconds from batch creation to full success. ‘created_at` shares the CLOCK_REALTIME epoch we record it with. No-op without a dogstatsd client.

Strictly best-effort: ‘fire_success` has already burned the `success` dedup key by the time we run, so a raise here (e.g. a Redis hiccup on the HGET) would permanently strand the success callbacks and linger that follow — a retry can’t re-fire them. Swallow and log instead.



97
98
99
100
101
102
103
104
105
106
# File 'lib/wurk/batch/callbacks.rb', line 97

def emit_duration_metric(bid)
  created = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'created_at') }
  return if created.nil? || created.to_s.empty?

  seconds = ::Process.clock_gettime(::Process::CLOCK_REALTIME) - created.to_f
  Wurk::Metrics::Statsd.distribution('batch.duration_dist', seconds)
rescue StandardError => e
  Wurk.logger.warn("batch #{bid}: duration metric emit failed: #{e.class}: #{e.message}")
  nil
end

.enqueue_callback_job(bid, target, event, options, queue) ⇒ Object



224
225
226
227
228
229
230
231
# File 'lib/wurk/batch/callbacks.rb', line 224

def enqueue_callback_job(bid, target, event, options, queue)
  Wurk::Client.push(
    'class' => 'Wurk::Batch::CallbackJob',
    'args' => [bid, target, event, options],
    'queue' => queue,
    'retry' => true
  )
end

.enqueue_callbacks(bid, event) ⇒ Object

Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued. Log and move on so every other callback still fires.



196
197
198
199
200
201
202
203
204
205
206
# File 'lib/wurk/batch/callbacks.rb', line 196

def enqueue_callbacks(bid, event)
  callbacks, queue = callback_specs_for(bid)

  callbacks.each do |(cb_event, target, options)|
    next unless cb_event == event

    enqueue_callback_job(bid, target, event, options, queue)
  rescue StandardError => e
    Wurk.logger.warn("batch #{bid}: #{event} callback #{target.inspect} enqueue failed: #{e.class}: #{e.message}")
  end
end

.fire_complete(bid) ⇒ Object



73
74
75
76
77
78
# File 'lib/wurk/batch/callbacks.rb', line 73

def fire_complete(bid)
  return unless dedup_set(bid, 'complete')

  record_event(bid, 'complete_at')
  enqueue_callbacks(bid, 'complete')
end

.fire_death(bid) ⇒ Object

Fired from Wurk::Batch::DeathHandler whenever a death makes the died set go non-empty: the first death, or the first re-death after every dead jid was manually retried back into the live set (#212 — that retry’s BATCH_PUSH cleared the death mark). The mark — durable ‘death` flag, `death_at`, `dead-batches` membership — is (re-)applied before the dedup guard so it is restored on re-death; the callback enqueue and parent cascade stay behind the guard so `:death` is enqueued at most once per batch.



52
53
54
55
56
57
58
59
# File 'lib/wurk/batch/callbacks.rb', line 52

def fire_death(bid)
  record_event(bid, 'death_at')
  Wurk.redis { |conn| conn.call('ZADD', 'dead-batches', Time.now.to_f.to_s, bid) }
  return unless dedup_set(bid, 'death')

  enqueue_callbacks(bid, 'death')
  cascade_death(bid)
end

.fire_success(bid) ⇒ Object



80
81
82
83
84
85
86
87
# File 'lib/wurk/batch/callbacks.rb', line 80

def fire_success(bid)
  return unless dedup_set(bid, 'success')

  record_event(bid, 'success_at')
  emit_duration_metric(bid)
  enqueue_callbacks(bid, 'success')
  apply_linger(bid)
end

.kids_finished?(bid) ⇒ Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/wurk/batch/callbacks.rb', line 40

def kids_finished?(bid)
  Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-pkids") }.to_i.zero?
end

.live_for(bid) ⇒ Object



263
# File 'lib/wurk/batch/callbacks.rb', line 263

def live_for(bid)    = Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-jids") }.to_i

.maybe_fire(bid, pending:, live:) ⇒ Object

Called from the server middleware after BATCH_ACK_SUCCESS (and from DeathHandler when a death drains the last live jid). Fires ‘:complete` when live jids hit 0; fires `:success` when pending also hits 0 and there have been no deaths.

Both fires are additionally gated on ‘b-<bid>-pkids` being empty —children whose own subtree hasn’t finished yet (#209). Spec §2.4: child ‘:complete`/`:success` fire before the parent’s, so when the parent’s own last job acks while a child batch is still running, nothing fires here; the last child’s propagate_to_parent re-invokes this and fires then. The SREM in pkids_drained? happens before that re-invocation, so exactly one of the racing paths fires (dedup_set absorbs the overlap).



31
32
33
34
35
36
37
38
# File 'lib/wurk/batch/callbacks.rb', line 31

def maybe_fire(bid, pending:, live:)
  return unless live.zero?
  return unless kids_finished?(bid)

  fire_complete(bid)
  fire_success(bid) if pending.zero? && !subtree_dead?(bid)
  propagate_to_parent(bid)
end

.own_died_remaining?(bid) ⇒ Boolean

Returns:

  • (Boolean)


184
185
186
# File 'lib/wurk/batch/callbacks.rb', line 184

def own_died_remaining?(bid)
  Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-died") }.to_i.positive?
end

.parent_bid_for(bid) ⇒ Object



251
252
253
# File 'lib/wurk/batch/callbacks.rb', line 251

def parent_bid_for(bid)
  Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'parent_bid') }
end

.parse_callbacks(raw) ⇒ Object



216
217
218
219
220
221
222
# File 'lib/wurk/batch/callbacks.rb', line 216

def parse_callbacks(raw)
  return [] if raw.nil? || raw.empty?

  JSON.parse(raw)
rescue JSON::ParserError
  []
end

.pending_for(bid) ⇒ Object



262
# File 'lib/wurk/batch/callbacks.rb', line 262

def pending_for(bid) = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'pending') }.to_i

.pkids_drained?(parent_bid, child_bid) ⇒ Boolean

Returns:

  • (Boolean)


255
256
257
258
259
260
# File 'lib/wurk/batch/callbacks.rb', line 255

def pkids_drained?(parent_bid, child_bid)
  Wurk.redis do |conn|
    conn.call('SREM', "b-#{parent_bid}-pkids", child_bid)
    conn.call('SCARD', "b-#{parent_bid}-pkids").to_i.zero?
  end
end

.propagate_to_parent(bid) ⇒ Object

When a child batch finishes (its live jids hit 0 — by success or death), remove it from the parent’s pkids set so the parent’s own callbacks wait on the full subtree. When the parent’s pkids hits 0, re-run the parent’s maybe_fire: if its own counts are already at zero (the parent-acks-first race), this is what finally fires it.



238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/wurk/batch/callbacks.rb', line 238

def propagate_to_parent(bid)
  parent_bid = parent_bid_for(bid)
  return if parent_bid.nil? || parent_bid.empty?
  return unless pkids_drained?(parent_bid, bid)

  # A recovered child may have lifted the last death from the parent's
  # subtree — clear the parent's cascaded mark before its gate runs, so
  # `:success` can fire. Harmless on the death path: the dying child
  # still carries its mark, so any_child_dead? keeps the parent dead.
  clear_death_on_recovery(parent_bid)
  maybe_fire(parent_bid, pending: pending_for(parent_bid), live: live_for(parent_bid))
end

.record_event(bid, field) ⇒ Object



129
130
131
132
133
134
135
# File 'lib/wurk/batch/callbacks.rb', line 129

def record_event(bid, field)
  now = ::Process.clock_gettime(::Process::CLOCK_REALTIME)
  Wurk.redis do |conn|
    conn.call('HSET', "b-#{bid}", field, now.to_s)
    conn.call('HSET', "b-#{bid}", field.to_s.sub('_at', ''), '1')
  end
end

.subtree_dead?(bid) ⇒ Boolean

A batch’s subtree is still dead while it carries the durable death mark OR any direct child does — deaths cascade up the parent chain, so a dead descendant keeps every ancestor’s child marked. This gates ‘:success`, which must never fire while a job in the subtree is terminally dead (spec §2.4). The child check matters for the brief window where a batch with both its own dead job and a dead child has its OWN dead job retried to success: BATCH_PUSH (#212) clears that batch’s own mark when its died set drains, but the child subtree is still dead, so ‘death_fired?` alone would wrongly let `:success` fire.

Returns:

  • (Boolean)


158
159
160
# File 'lib/wurk/batch/callbacks.rb', line 158

def subtree_dead?(bid)
  death_fired?(bid) || any_child_dead?(bid)
end