Module: Wurk::Batch::Callbacks
- Defined in:
- lib/wurk/batch/callbacks.rb
Overview
Fires batch callbacks (‘:success`, `:complete`, `:death`) by enqueuing them as ordinary jobs on the batch’s ‘callback_queue`. Dedup is via b-<bid>-notify so the same callback can’t be enqueued twice even if multiple workers race to ack the final job.
Callback wrapper job: Wurk::Batch::CallbackJob — given a target spec (“Klass” or “Klass#method”) and options hash, it instantiates and invokes on_<event> (or the named method) with the Status snapshot.
Class Method Summary collapse
- .any_child_dead?(bid) ⇒ Boolean
-
.apply_linger(bid) ⇒ Object
Post-success retention: a succeeded batch no longer coordinates any jobs, so its keys expire after the per-batch ‘linger` override (else 24h) instead of the 30d pending TTL.
- .callback_specs_for(bid) ⇒ Object
-
.cascade_death(bid) ⇒ Object
A child’s death means the parent — and every ancestor — can never fully succeed, so ‘:death` propagates up the parent chain.
-
.clear_death_on_recovery(bid) ⇒ Object
Recovery counterpart to cascade_death (#226).
-
.death_fired?(bid) ⇒ Boolean
True once ‘:death` has fired for this batch — from one of its own jobs dying or from a descendant’s death cascading up.
-
.dedup_set(bid, event) ⇒ Object
Atomically marks ‘bid` as having fired `event`.
-
.emit_duration_metric(bid) ⇒ Object
Pro statsd metric (spec §9.3): wall-clock seconds from batch creation to full success.
- .enqueue_callback_job(bid, target, event, options, queue) ⇒ Object
-
.enqueue_callbacks(bid, event) ⇒ Object
Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued.
- .fire_complete(bid) ⇒ Object
-
.fire_death(bid) ⇒ Object
Fired from Wurk::Batch::DeathHandler whenever a death makes the died set go non-empty: the first death, or the first re-death after every dead jid was manually retried back into the live set (#212 — that retry’s BATCH_PUSH cleared the death mark).
- .fire_success(bid) ⇒ Object
- .kids_finished?(bid) ⇒ Boolean
- .live_for(bid) ⇒ Object
-
.maybe_fire(bid, pending:, live:) ⇒ Object
Called from the server middleware after BATCH_ACK_SUCCESS (and from DeathHandler when a death drains the last live jid).
- .own_died_remaining?(bid) ⇒ Boolean
- .parent_bid_for(bid) ⇒ Object
- .parse_callbacks(raw) ⇒ Object
- .pending_for(bid) ⇒ Object
- .pkids_drained?(parent_bid, child_bid) ⇒ Boolean
-
.propagate_to_parent(bid) ⇒ Object
When a child batch finishes (its live jids hit 0 — by success or death), remove it from the parent’s pkids set so the parent’s own callbacks wait on the full subtree.
- .record_event(bid, field) ⇒ Object
-
.subtree_dead?(bid) ⇒ Boolean
A batch’s subtree is still dead while it carries the durable death mark OR any direct child does — deaths cascade up the parent chain, so a dead descendant keeps every ancestor’s child marked.
Class Method Details
.any_child_dead?(bid) ⇒ Boolean
188 189 190 191 |
# File 'lib/wurk/batch/callbacks.rb', line 188 def any_child_dead?(bid) kids = Wurk.redis { |conn| conn.call('SMEMBERS', "b-#{bid}-kids") } kids.any? { |kid| death_fired?(kid) } end |
.apply_linger(bid) ⇒ Object
Post-success retention: a succeeded batch no longer coordinates any jobs, so its keys expire after the per-batch ‘linger` override (else 24h) instead of the 30d pending TTL. Mirrors Sidekiq Pro §2.8.
111 112 113 114 115 116 117 |
# File 'lib/wurk/batch/callbacks.rb', line 111 def apply_linger(bid) raw = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'linger') } seconds = raw.nil? || raw.to_s.empty? ? Batch::POST_SUCCESS_EXPIRY_SECONDS : raw.to_i Wurk.redis do |conn| Batch.keys_for(bid).each { |key| conn.call('EXPIRE', key, seconds) } end end |
.callback_specs_for(bid) ⇒ Object
208 209 210 211 212 213 214 |
# File 'lib/wurk/batch/callbacks.rb', line 208 def callback_specs_for(bid) raw = Wurk.redis { |conn| conn.call('HMGET', "b-#{bid}", 'callbacks', 'callback_queue') } callbacks_json, queue = raw queue = 'default' if queue.nil? || queue.empty? parsed = parse_callbacks(callbacks_json) [parsed, queue] end |
.cascade_death(bid) ⇒ Object
A child’s death means the parent — and every ancestor — can never fully succeed, so ‘:death` propagates up the parent chain. The recursion bottoms out at the root (empty parent_bid); fire_death’s own dedup_set guard makes each ancestor’s ‘:death` fire exactly once even under racing children.
66 67 68 69 70 71 |
# File 'lib/wurk/batch/callbacks.rb', line 66 def cascade_death(bid) parent_bid = parent_bid_for(bid) return if parent_bid.nil? || parent_bid.empty? fire_death(parent_bid) end |
.clear_death_on_recovery(bid) ⇒ Object
Recovery counterpart to cascade_death (#226). When a descendant’s last dead job is manually retried back to success, the descendant clears its OWN death mark (#212, in BATCH_PUSH) — but every ancestor was marked by the death cascade, not by a jid in its own died set, so nothing here ever cleared them and the ancestor’s ‘:success` stayed suppressed forever. Re-evaluate this batch: drop its durable death mark and `dead-batches` membership once its own died set is empty AND no child still carries a death mark. The `b-<bid>-death` notify dedup key is deliberately left intact, so a later re-death re-marks the batch (fire_death restores the flag before its own dedup guard) without ever re-enqueuing `:death`.
173 174 175 176 177 178 179 180 181 182 |
# File 'lib/wurk/batch/callbacks.rb', line 173 def clear_death_on_recovery(bid) return unless death_fired?(bid) return if own_died_remaining?(bid) return if any_child_dead?(bid) Wurk.redis do |conn| conn.call('HDEL', "b-#{bid}", 'death') conn.call('ZREM', 'dead-batches', bid) end end |
.death_fired?(bid) ⇒ Boolean
True once ‘:death` has fired for this batch — from one of its own jobs dying or from a descendant’s death cascading up. Suppresses ‘:success`, which must never fire after any death in the subtree.
Reads the durable ‘death` field on `b-<bid>` (written by `record_event`), not the `b-<bid>-death` dedup key — the dedup key has its own 30d TTL and can expire while an ancestor batch is still open, after which a late `maybe_fire` would wrongly emit `:success`.
145 146 147 |
# File 'lib/wurk/batch/callbacks.rb', line 145 def death_fired?(bid) Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'death') } == '1' end |
.dedup_set(bid, event) ⇒ Object
Atomically marks ‘bid` as having fired `event`. Returns true the first time, false thereafter — caller skips the enqueue when false. SET NX makes this safe under racing acks.
122 123 124 125 126 127 |
# File 'lib/wurk/batch/callbacks.rb', line 122 def dedup_set(bid, event) Wurk.redis do |conn| ok = conn.call('SET', "b-#{bid}-#{event}", '1', 'NX', 'EX', Batch::CALLBACK_NOTIFY_TTL) ok == 'OK' end end |
.emit_duration_metric(bid) ⇒ Object
Pro statsd metric (spec §9.3): wall-clock seconds from batch creation to full success. ‘created_at` shares the CLOCK_REALTIME epoch we record it with. No-op without a dogstatsd client.
Strictly best-effort: ‘fire_success` has already burned the `success` dedup key by the time we run, so a raise here (e.g. a Redis hiccup on the HGET) would permanently strand the success callbacks and linger that follow — a retry can’t re-fire them. Swallow and log instead.
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/wurk/batch/callbacks.rb', line 97 def emit_duration_metric(bid) created = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'created_at') } return if created.nil? || created.to_s.empty? seconds = ::Process.clock_gettime(::Process::CLOCK_REALTIME) - created.to_f Wurk::Metrics::Statsd.distribution('batch.duration_dist', seconds) rescue StandardError => e Wurk.logger.warn("batch #{bid}: duration metric emit failed: #{e.class}: #{e.}") nil end |
.enqueue_callback_job(bid, target, event, options, queue) ⇒ Object
224 225 226 227 228 229 230 231 |
# File 'lib/wurk/batch/callbacks.rb', line 224 def enqueue_callback_job(bid, target, event, , queue) Wurk::Client.push( 'class' => 'Wurk::Batch::CallbackJob', 'args' => [bid, target, event, ], 'queue' => queue, 'retry' => true ) end |
.enqueue_callbacks(bid, event) ⇒ Object
Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued. Log and move on so every other callback still fires.
196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/wurk/batch/callbacks.rb', line 196 def enqueue_callbacks(bid, event) callbacks, queue = callback_specs_for(bid) callbacks.each do |(cb_event, target, )| next unless cb_event == event enqueue_callback_job(bid, target, event, , queue) rescue StandardError => e Wurk.logger.warn("batch #{bid}: #{event} callback #{target.inspect} enqueue failed: #{e.class}: #{e.}") end end |
.fire_complete(bid) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/wurk/batch/callbacks.rb', line 73 def fire_complete(bid) return unless dedup_set(bid, 'complete') record_event(bid, 'complete_at') enqueue_callbacks(bid, 'complete') end |
.fire_death(bid) ⇒ Object
Fired from Wurk::Batch::DeathHandler whenever a death makes the died set go non-empty: the first death, or the first re-death after every dead jid was manually retried back into the live set (#212 — that retry’s BATCH_PUSH cleared the death mark). The mark — durable ‘death` flag, `death_at`, `dead-batches` membership — is (re-)applied before the dedup guard so it is restored on re-death; the callback enqueue and parent cascade stay behind the guard so `:death` is enqueued at most once per batch.
52 53 54 55 56 57 58 59 |
# File 'lib/wurk/batch/callbacks.rb', line 52 def fire_death(bid) record_event(bid, 'death_at') Wurk.redis { |conn| conn.call('ZADD', 'dead-batches', Time.now.to_f.to_s, bid) } return unless dedup_set(bid, 'death') enqueue_callbacks(bid, 'death') cascade_death(bid) end |
.fire_success(bid) ⇒ Object
80 81 82 83 84 85 86 87 |
# File 'lib/wurk/batch/callbacks.rb', line 80 def fire_success(bid) return unless dedup_set(bid, 'success') record_event(bid, 'success_at') emit_duration_metric(bid) enqueue_callbacks(bid, 'success') apply_linger(bid) end |
.kids_finished?(bid) ⇒ Boolean
40 41 42 |
# File 'lib/wurk/batch/callbacks.rb', line 40 def kids_finished?(bid) Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-pkids") }.to_i.zero? end |
.live_for(bid) ⇒ Object
263 |
# File 'lib/wurk/batch/callbacks.rb', line 263 def live_for(bid) = Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-jids") }.to_i |
.maybe_fire(bid, pending:, live:) ⇒ Object
Called from the server middleware after BATCH_ACK_SUCCESS (and from DeathHandler when a death drains the last live jid). Fires ‘:complete` when live jids hit 0; fires `:success` when pending also hits 0 and there have been no deaths.
Both fires are additionally gated on ‘b-<bid>-pkids` being empty —children whose own subtree hasn’t finished yet (#209). Spec §2.4: child ‘:complete`/`:success` fire before the parent’s, so when the parent’s own last job acks while a child batch is still running, nothing fires here; the last child’s propagate_to_parent re-invokes this and fires then. The SREM in pkids_drained? happens before that re-invocation, so exactly one of the racing paths fires (dedup_set absorbs the overlap).
31 32 33 34 35 36 37 38 |
# File 'lib/wurk/batch/callbacks.rb', line 31 def maybe_fire(bid, pending:, live:) return unless live.zero? return unless kids_finished?(bid) fire_complete(bid) fire_success(bid) if pending.zero? && !subtree_dead?(bid) propagate_to_parent(bid) end |
.own_died_remaining?(bid) ⇒ Boolean
184 185 186 |
# File 'lib/wurk/batch/callbacks.rb', line 184 def own_died_remaining?(bid) Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-died") }.to_i.positive? end |
.parent_bid_for(bid) ⇒ Object
251 252 253 |
# File 'lib/wurk/batch/callbacks.rb', line 251 def parent_bid_for(bid) Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'parent_bid') } end |
.parse_callbacks(raw) ⇒ Object
216 217 218 219 220 221 222 |
# File 'lib/wurk/batch/callbacks.rb', line 216 def parse_callbacks(raw) return [] if raw.nil? || raw.empty? JSON.parse(raw) rescue JSON::ParserError [] end |
.pending_for(bid) ⇒ Object
262 |
# File 'lib/wurk/batch/callbacks.rb', line 262 def pending_for(bid) = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'pending') }.to_i |
.pkids_drained?(parent_bid, child_bid) ⇒ Boolean
255 256 257 258 259 260 |
# File 'lib/wurk/batch/callbacks.rb', line 255 def pkids_drained?(parent_bid, child_bid) Wurk.redis do |conn| conn.call('SREM', "b-#{parent_bid}-pkids", child_bid) conn.call('SCARD', "b-#{parent_bid}-pkids").to_i.zero? end end |
.propagate_to_parent(bid) ⇒ Object
When a child batch finishes (its live jids hit 0 — by success or death), remove it from the parent’s pkids set so the parent’s own callbacks wait on the full subtree. When the parent’s pkids hits 0, re-run the parent’s maybe_fire: if its own counts are already at zero (the parent-acks-first race), this is what finally fires it.
238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/wurk/batch/callbacks.rb', line 238 def propagate_to_parent(bid) parent_bid = parent_bid_for(bid) return if parent_bid.nil? || parent_bid.empty? return unless pkids_drained?(parent_bid, bid) # A recovered child may have lifted the last death from the parent's # subtree — clear the parent's cascaded mark before its gate runs, so # `:success` can fire. Harmless on the death path: the dying child # still carries its mark, so any_child_dead? keeps the parent dead. clear_death_on_recovery(parent_bid) maybe_fire(parent_bid, pending: pending_for(parent_bid), live: live_for(parent_bid)) end |
.record_event(bid, field) ⇒ Object
129 130 131 132 133 134 135 |
# File 'lib/wurk/batch/callbacks.rb', line 129 def record_event(bid, field) now = ::Process.clock_gettime(::Process::CLOCK_REALTIME) Wurk.redis do |conn| conn.call('HSET', "b-#{bid}", field, now.to_s) conn.call('HSET', "b-#{bid}", field.to_s.sub('_at', ''), '1') end end |
.subtree_dead?(bid) ⇒ Boolean
A batch’s subtree is still dead while it carries the durable death mark OR any direct child does — deaths cascade up the parent chain, so a dead descendant keeps every ancestor’s child marked. This gates ‘:success`, which must never fire while a job in the subtree is terminally dead (spec §2.4). The child check matters for the brief window where a batch with both its own dead job and a dead child has its OWN dead job retried to success: BATCH_PUSH (#212) clears that batch’s own mark when its died set drains, but the child subtree is still dead, so ‘death_fired?` alone would wrongly let `:success` fire.
158 159 160 |
# File 'lib/wurk/batch/callbacks.rb', line 158 def subtree_dead?(bid) death_fired?(bid) || any_child_dead?(bid) end |