Module: Wurk::Batch::Callbacks

Defined in:
lib/wurk/batch/callbacks.rb

Overview

Fires batch callbacks (‘:success`, `:complete`, `:death`) by enqueuing them as ordinary jobs on the batch’s ‘callback_queue`. Dedup is via b-<bid>-notify so the same callback can’t be enqueued twice even if multiple workers race to ack the final job.

Callback wrapper job: Wurk::Batch::CallbackJob — given a target spec (“Klass” or “Klass#method”) and options hash, it instantiates and invokes on_<event> (or the named method) with the Status snapshot.

Class Method Summary collapse

Class Method Details

.apply_linger(bid) ⇒ Object

Post-success retention: a succeeded batch no longer coordinates any jobs, so its keys expire after the per-batch ‘linger` override (else 24h) instead of the 30d pending TTL. Mirrors Sidekiq Pro §2.8.



71
72
73
74
75
76
77
# File 'lib/wurk/batch/callbacks.rb', line 71

def apply_linger(bid)
  raw     = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'linger') }
  seconds = raw.nil? || raw.to_s.empty? ? Batch::POST_SUCCESS_EXPIRY_SECONDS : raw.to_i
  Wurk.redis do |conn|
    Batch.keys_for(bid).each { |key| conn.call('EXPIRE', key, seconds) }
  end
end

.callback_specs_for(bid) ⇒ Object



124
125
126
127
128
129
130
# File 'lib/wurk/batch/callbacks.rb', line 124

def callback_specs_for(bid)
  raw = Wurk.redis { |conn| conn.call('HMGET', "b-#{bid}", 'callbacks', 'callback_queue') }
  callbacks_json, queue = raw
  queue = 'default' if queue.nil? || queue.empty?
  parsed = parse_callbacks(callbacks_json)
  [parsed, queue]
end

.cascade_death(bid) ⇒ Object

A child’s death means the parent — and every ancestor — can never fully succeed, so ‘:death` propagates up the parent chain. The recursion bottoms out at the root (empty parent_bid); fire_death’s own dedup_set guard makes each ancestor’s ‘:death` fire exactly once even under racing children.



46
47
48
49
50
51
# File 'lib/wurk/batch/callbacks.rb', line 46

def cascade_death(bid)
  parent_bid = parent_bid_for(bid)
  return if parent_bid.nil? || parent_bid.empty?

  fire_death(parent_bid)
end

.death_fired?(bid) ⇒ Boolean

True once ‘:death` has fired for this batch — from one of its own jobs dying or from a descendant’s death cascading up. Suppresses ‘:success`, which must never fire after any death in the subtree.

Reads the durable ‘death` field on `b-<bid>` (written by `record_event`), not the `b-<bid>-death` dedup key — the dedup key has its own 30d TTL and can expire while an ancestor batch is still open, after which a late `maybe_fire` would wrongly emit `:success`.

Returns:

  • (Boolean)


105
106
107
# File 'lib/wurk/batch/callbacks.rb', line 105

def death_fired?(bid)
  Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'death') } == '1'
end

.dedup_set(bid, event) ⇒ Object

Atomically marks ‘bid` as having fired `event`. Returns true the first time, false thereafter — caller skips the enqueue when false. SET NX makes this safe under racing acks.



82
83
84
85
86
87
# File 'lib/wurk/batch/callbacks.rb', line 82

def dedup_set(bid, event)
  Wurk.redis do |conn|
    ok = conn.call('SET', "b-#{bid}-#{event}", '1', 'NX', 'EX', Batch::CALLBACK_NOTIFY_TTL)
    ok == 'OK'
  end
end

.enqueue_callback_job(bid, target, event, options, queue) ⇒ Object



140
141
142
143
144
145
146
147
# File 'lib/wurk/batch/callbacks.rb', line 140

def enqueue_callback_job(bid, target, event, options, queue)
  Wurk::Client.push(
    'class' => 'Wurk::Batch::CallbackJob',
    'args' => [bid, target, event, options],
    'queue' => queue,
    'retry' => true
  )
end

.enqueue_callbacks(bid, event) ⇒ Object

Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued. Log and move on so every other callback still fires.



112
113
114
115
116
117
118
119
120
121
122
# File 'lib/wurk/batch/callbacks.rb', line 112

def enqueue_callbacks(bid, event)
  callbacks, queue = callback_specs_for(bid)

  callbacks.each do |(cb_event, target, options)|
    next unless cb_event == event

    enqueue_callback_job(bid, target, event, options, queue)
  rescue StandardError => e
    Wurk.logger.warn("batch #{bid}: #{event} callback #{target.inspect} enqueue failed: #{e.class}: #{e.message}")
  end
end

.fire_complete(bid) ⇒ Object



53
54
55
56
57
58
# File 'lib/wurk/batch/callbacks.rb', line 53

def fire_complete(bid)
  return unless dedup_set(bid, 'complete')

  record_event(bid, 'complete_at')
  enqueue_callbacks(bid, 'complete')
end

.fire_death(bid) ⇒ Object

Fired from Wurk::Batch::DeathHandler on the FIRST permanent death in the batch only. Subsequent deaths bump the counter but do not re-enqueue the callback.



32
33
34
35
36
37
38
39
# File 'lib/wurk/batch/callbacks.rb', line 32

def fire_death(bid)
  return unless dedup_set(bid, 'death')

  record_event(bid, 'death_at')
  Wurk.redis { |conn| conn.call('ZADD', 'dead-batches', Time.now.to_f.to_s, bid) }
  enqueue_callbacks(bid, 'death')
  cascade_death(bid)
end

.fire_success(bid) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/wurk/batch/callbacks.rb', line 60

def fire_success(bid)
  return unless dedup_set(bid, 'success')

  record_event(bid, 'success_at')
  enqueue_callbacks(bid, 'success')
  apply_linger(bid)
end

.live_for(bid) ⇒ Object



173
# File 'lib/wurk/batch/callbacks.rb', line 173

def live_for(bid)    = Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-jids") }.to_i

.maybe_fire(bid, pending:, live:) ⇒ Object

Called from the server middleware after BATCH_ACK_SUCCESS. Fires ‘:complete` when live jids hit 0; fires `:success` when pending also hits 0 and there have been no deaths.



21
22
23
24
25
26
27
# File 'lib/wurk/batch/callbacks.rb', line 21

def maybe_fire(bid, pending:, live:)
  return unless live.zero?

  fire_complete(bid)
  fire_success(bid) if pending.zero? && !death_fired?(bid)
  propagate_to_parent(bid)
end

.parent_bid_for(bid) ⇒ Object



161
162
163
# File 'lib/wurk/batch/callbacks.rb', line 161

def parent_bid_for(bid)
  Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'parent_bid') }
end

.parse_callbacks(raw) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/wurk/batch/callbacks.rb', line 132

def parse_callbacks(raw)
  return [] if raw.nil? || raw.empty?

  JSON.parse(raw)
rescue JSON::ParserError
  []
end

.pending_for(bid) ⇒ Object



172
# File 'lib/wurk/batch/callbacks.rb', line 172

def pending_for(bid) = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'pending') }.to_i

.pkids_drained?(parent_bid, child_bid) ⇒ Boolean

Returns:

  • (Boolean)


165
166
167
168
169
170
# File 'lib/wurk/batch/callbacks.rb', line 165

def pkids_drained?(parent_bid, child_bid)
  Wurk.redis do |conn|
    conn.call('SREM', "b-#{parent_bid}-pkids", child_bid)
    conn.call('SCARD', "b-#{parent_bid}-pkids").to_i.zero?
  end
end

.propagate_to_parent(bid) ⇒ Object

When a child batch’s ‘:success` fires, decrement the parent’s pkids set so the parent’s own ‘:success` waits on the full subtree. When parent’s pkids hits 0 and its own pending is 0, parent’s success fires too.



153
154
155
156
157
158
159
# File 'lib/wurk/batch/callbacks.rb', line 153

def propagate_to_parent(bid)
  parent_bid = parent_bid_for(bid)
  return if parent_bid.nil? || parent_bid.empty?
  return unless pkids_drained?(parent_bid, bid)

  maybe_fire(parent_bid, pending: pending_for(parent_bid), live: live_for(parent_bid))
end

.record_event(bid, field) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/wurk/batch/callbacks.rb', line 89

def record_event(bid, field)
  now = ::Process.clock_gettime(::Process::CLOCK_REALTIME)
  Wurk.redis do |conn|
    conn.call('HSET', "b-#{bid}", field, now.to_s)
    conn.call('HSET', "b-#{bid}", field.to_s.sub('_at', ''), '1')
  end
end