Module: Wurk::Batch::Callbacks
- Defined in:
- lib/wurk/batch/callbacks.rb
Overview
Fires batch callbacks (‘:success`, `:complete`, `:death`) by enqueuing them as ordinary jobs on the batch’s ‘callback_queue`. Dedup is via b-<bid>-notify so the same callback can’t be enqueued twice even if multiple workers race to ack the final job.
Callback wrapper job: Wurk::Batch::CallbackJob — given a target spec (“Klass” or “Klass#method”) and options hash, it instantiates and invokes on_<event> (or the named method) with the Status snapshot.
Class Method Summary collapse
-
.apply_linger(bid) ⇒ Object
Post-success retention: a succeeded batch no longer coordinates any jobs, so its keys expire after the per-batch ‘linger` override (else 24h) instead of the 30d pending TTL.
- .callback_specs_for(bid) ⇒ Object
-
.cascade_death(bid) ⇒ Object
A child’s death means the parent — and every ancestor — can never fully succeed, so ‘:death` propagates up the parent chain.
-
.death_fired?(bid) ⇒ Boolean
True once ‘:death` has fired for this batch — from one of its own jobs dying or from a descendant’s death cascading up.
-
.dedup_set(bid, event) ⇒ Object
Atomically marks ‘bid` as having fired `event`.
- .enqueue_callback_job(bid, target, event, options, queue) ⇒ Object
-
.enqueue_callbacks(bid, event) ⇒ Object
Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued.
- .fire_complete(bid) ⇒ Object
-
.fire_death(bid) ⇒ Object
Fired from Wurk::Batch::DeathHandler on the FIRST permanent death in the batch only.
- .fire_success(bid) ⇒ Object
- .live_for(bid) ⇒ Object
-
.maybe_fire(bid, pending:, live:) ⇒ Object
Called from the server middleware after BATCH_ACK_SUCCESS.
- .parent_bid_for(bid) ⇒ Object
- .parse_callbacks(raw) ⇒ Object
- .pending_for(bid) ⇒ Object
- .pkids_drained?(parent_bid, child_bid) ⇒ Boolean
-
.propagate_to_parent(bid) ⇒ Object
When a child batch’s ‘:success` fires, decrement the parent’s pkids set so the parent’s own ‘:success` waits on the full subtree.
- .record_event(bid, field) ⇒ Object
Class Method Details
.apply_linger(bid) ⇒ Object
Post-success retention: a succeeded batch no longer coordinates any jobs, so its keys expire after the per-batch ‘linger` override (else 24h) instead of the 30d pending TTL. Mirrors Sidekiq Pro §2.8.
71 72 73 74 75 76 77 |
# File 'lib/wurk/batch/callbacks.rb', line 71 def apply_linger(bid) raw = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'linger') } seconds = raw.nil? || raw.to_s.empty? ? Batch::POST_SUCCESS_EXPIRY_SECONDS : raw.to_i Wurk.redis do |conn| Batch.keys_for(bid).each { |key| conn.call('EXPIRE', key, seconds) } end end |
.callback_specs_for(bid) ⇒ Object
124 125 126 127 128 129 130 |
# File 'lib/wurk/batch/callbacks.rb', line 124 def callback_specs_for(bid) raw = Wurk.redis { |conn| conn.call('HMGET', "b-#{bid}", 'callbacks', 'callback_queue') } callbacks_json, queue = raw queue = 'default' if queue.nil? || queue.empty? parsed = parse_callbacks(callbacks_json) [parsed, queue] end |
.cascade_death(bid) ⇒ Object
A child’s death means the parent — and every ancestor — can never fully succeed, so ‘:death` propagates up the parent chain. The recursion bottoms out at the root (empty parent_bid); fire_death’s own dedup_set guard makes each ancestor’s ‘:death` fire exactly once even under racing children.
46 47 48 49 50 51 |
# File 'lib/wurk/batch/callbacks.rb', line 46 def cascade_death(bid) parent_bid = parent_bid_for(bid) return if parent_bid.nil? || parent_bid.empty? fire_death(parent_bid) end |
.death_fired?(bid) ⇒ Boolean
True once ‘:death` has fired for this batch — from one of its own jobs dying or from a descendant’s death cascading up. Suppresses ‘:success`, which must never fire after any death in the subtree.
Reads the durable ‘death` field on `b-<bid>` (written by `record_event`), not the `b-<bid>-death` dedup key — the dedup key has its own 30d TTL and can expire while an ancestor batch is still open, after which a late `maybe_fire` would wrongly emit `:success`.
105 106 107 |
# File 'lib/wurk/batch/callbacks.rb', line 105 def death_fired?(bid) Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'death') } == '1' end |
.dedup_set(bid, event) ⇒ Object
Atomically marks ‘bid` as having fired `event`. Returns true the first time, false thereafter — caller skips the enqueue when false. SET NX makes this safe under racing acks.
82 83 84 85 86 87 |
# File 'lib/wurk/batch/callbacks.rb', line 82 def dedup_set(bid, event) Wurk.redis do |conn| ok = conn.call('SET', "b-#{bid}-#{event}", '1', 'NX', 'EX', Batch::CALLBACK_NOTIFY_TTL) ok == 'OK' end end |
.enqueue_callback_job(bid, target, event, options, queue) ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/wurk/batch/callbacks.rb', line 140 def enqueue_callback_job(bid, target, event, , queue) Wurk::Client.push( 'class' => 'Wurk::Batch::CallbackJob', 'args' => [bid, target, event, ], 'queue' => queue, 'retry' => true ) end |
.enqueue_callbacks(bid, event) ⇒ Object
Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued. Log and move on so every other callback still fires.
112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/wurk/batch/callbacks.rb', line 112 def enqueue_callbacks(bid, event) callbacks, queue = callback_specs_for(bid) callbacks.each do |(cb_event, target, )| next unless cb_event == event enqueue_callback_job(bid, target, event, , queue) rescue StandardError => e Wurk.logger.warn("batch #{bid}: #{event} callback #{target.inspect} enqueue failed: #{e.class}: #{e.}") end end |
.fire_complete(bid) ⇒ Object
53 54 55 56 57 58 |
# File 'lib/wurk/batch/callbacks.rb', line 53 def fire_complete(bid) return unless dedup_set(bid, 'complete') record_event(bid, 'complete_at') enqueue_callbacks(bid, 'complete') end |
.fire_death(bid) ⇒ Object
Fired from Wurk::Batch::DeathHandler on the FIRST permanent death in the batch only. Subsequent deaths bump the counter but do not re-enqueue the callback.
32 33 34 35 36 37 38 39 |
# File 'lib/wurk/batch/callbacks.rb', line 32 def fire_death(bid) return unless dedup_set(bid, 'death') record_event(bid, 'death_at') Wurk.redis { |conn| conn.call('ZADD', 'dead-batches', Time.now.to_f.to_s, bid) } enqueue_callbacks(bid, 'death') cascade_death(bid) end |
.fire_success(bid) ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/wurk/batch/callbacks.rb', line 60 def fire_success(bid) return unless dedup_set(bid, 'success') record_event(bid, 'success_at') enqueue_callbacks(bid, 'success') apply_linger(bid) end |
.live_for(bid) ⇒ Object
173 |
# File 'lib/wurk/batch/callbacks.rb', line 173 def live_for(bid) = Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-jids") }.to_i |
.maybe_fire(bid, pending:, live:) ⇒ Object
Called from the server middleware after BATCH_ACK_SUCCESS. Fires ‘:complete` when live jids hit 0; fires `:success` when pending also hits 0 and there have been no deaths.
21 22 23 24 25 26 27 |
# File 'lib/wurk/batch/callbacks.rb', line 21 def maybe_fire(bid, pending:, live:) return unless live.zero? fire_complete(bid) fire_success(bid) if pending.zero? && !death_fired?(bid) propagate_to_parent(bid) end |
.parent_bid_for(bid) ⇒ Object
161 162 163 |
# File 'lib/wurk/batch/callbacks.rb', line 161 def parent_bid_for(bid) Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'parent_bid') } end |
.parse_callbacks(raw) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/wurk/batch/callbacks.rb', line 132 def parse_callbacks(raw) return [] if raw.nil? || raw.empty? JSON.parse(raw) rescue JSON::ParserError [] end |
.pending_for(bid) ⇒ Object
172 |
# File 'lib/wurk/batch/callbacks.rb', line 172 def pending_for(bid) = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'pending') }.to_i |
.pkids_drained?(parent_bid, child_bid) ⇒ Boolean
165 166 167 168 169 170 |
# File 'lib/wurk/batch/callbacks.rb', line 165 def pkids_drained?(parent_bid, child_bid) Wurk.redis do |conn| conn.call('SREM', "b-#{parent_bid}-pkids", child_bid) conn.call('SCARD', "b-#{parent_bid}-pkids").to_i.zero? end end |
.propagate_to_parent(bid) ⇒ Object
When a child batch’s ‘:success` fires, decrement the parent’s pkids set so the parent’s own ‘:success` waits on the full subtree. When parent’s pkids hits 0 and its own pending is 0, parent’s success fires too.
153 154 155 156 157 158 159 |
# File 'lib/wurk/batch/callbacks.rb', line 153 def propagate_to_parent(bid) parent_bid = parent_bid_for(bid) return if parent_bid.nil? || parent_bid.empty? return unless pkids_drained?(parent_bid, bid) maybe_fire(parent_bid, pending: pending_for(parent_bid), live: live_for(parent_bid)) end |
.record_event(bid, field) ⇒ Object
89 90 91 92 93 94 95 |
# File 'lib/wurk/batch/callbacks.rb', line 89 def record_event(bid, field) now = ::Process.clock_gettime(::Process::CLOCK_REALTIME) Wurk.redis do |conn| conn.call('HSET', "b-#{bid}", field, now.to_s) conn.call('HSET', "b-#{bid}", field.to_s.sub('_at', ''), '1') end end |