Class: Wurk::Batch
- Inherits:
-
Object
- Object
- Wurk::Batch
- Defined in:
- lib/wurk/batch.rb,
lib/wurk/batch_set.rb,
lib/wurk/batch/empty.rb,
lib/wurk/batch/buffer.rb,
lib/wurk/batch/status.rb,
lib/wurk/batch/callbacks.rb,
lib/wurk/batch/callback_job.rb,
lib/wurk/batch/death_handler.rb,
lib/wurk/batch/client_middleware.rb,
lib/wurk/batch/server_middleware.rb
Overview
Sidekiq Pro Batches. Group jobs, attach success/complete/death callbacks, track progress. Spec: docs/target/sidekiq-pro.md §2.
Lifecycle:
1. `Batch.new` allocates a fresh BID; the batch is `mutable?` until the
first `#jobs` block flushes — that flush HSETs the core hash, ZADDs
to `batches`, and writes tag indexes.
2. `Batch.new(bid)` reopens an existing batch (legal from inside a job
or callback only). `mutable?` is false because reopening implies the
first flush already happened.
3. `#jobs { ... }` collects `Job.perform_async` calls via the client
middleware (Thread.current[:wurk_current_batch] is the signal),
atomically registering each via BATCH_PUSH.
4. Workers ack on success → BATCH_ACK_SUCCESS → pending--.
Death handler acks on permanent failure → BATCH_ACK_COMPLETE.
5. When live jids hits zero → fire `:complete`. When pending also
hits zero with zero deaths → fire `:success`.
Nested batches: a job opening its OWN batch (‘batch.jobs { … }`) increments live counters on the existing batch. A callback opening its PARENT batch links via parent_bid and adds child BID to b-<bid>-kids.
Defined Under Namespace
Modules: Callbacks Classes: Buffer, CallbackJob, ClientMiddleware, DeadSet, DeathHandler, Empty, ServerMiddleware, Status
Constant Summary collapse
- DEFAULT_EXPIRY_SECONDS =
30 * 24 * 60 * 60
- POST_SUCCESS_EXPIRY_SECONDS =
24 * 60 * 60
- CALLBACK_NOTIFY_TTL =
30 * 24 * 60 * 60
- BID_BYTES =
Bid is URL-safe base64 of 10 random bytes — matches Sidekiq Pro’s BID generator. Length matters: third-party gems that key off bid prefix (sharded batches in Pro 8) inspect the first character.
10- VALID_EVENTS =
%i[success complete death].freeze
- KEY_SUFFIXES =
The ‘live’ set tracks jobs that have not yet reached a terminal state. When it’s empty, every job has either succeeded or died → ‘:complete` is allowed to fire.
%w[jids failed died notify cbsucc kids pkids tags].freeze
- THREAD_KEY =
:wurk_current_batch- BUFFER_KEY =
Set on the current thread (to a Buffer) only inside an autoflush ‘#jobs` block. Client#raw_push reads it: when present, batched pushes accumulate here instead of round-tripping per job.
:wurk_batch_buffer
Instance Attribute Summary collapse
-
#autoflush ⇒ Object
Returns the value of attribute autoflush.
-
#bid ⇒ Object
readonly
Returns the value of attribute bid.
-
#callback_class ⇒ Object
Returns the value of attribute callback_class.
-
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
-
#description ⇒ Object
Returns the value of attribute description.
-
#linger ⇒ Object
Returns the value of attribute linger.
-
#parent_bid ⇒ Object
readonly
Returns the value of attribute parent_bid.
Class Method Summary collapse
Instance Method Summary collapse
- #expires_in(duration) ⇒ Object
- #include?(jid) ⇒ Boolean
-
#initialize(bid = nil) ⇒ Batch
constructor
A new instance of Batch.
-
#invalidate_all ⇒ Object
Mark batch invalid.
-
#jobs(&block) ⇒ Object
Atomic enqueue block.
- #mutable? ⇒ Boolean
-
#on(event, callback, options = {}) ⇒ Object
Register a callback.
- #parent ⇒ Object
-
#remove_jobs(*jids) ⇒ Object
Remove jobs from the batch.
- #status ⇒ Object
- #tags ⇒ Object
-
#tags=(value) ⇒ Object
Hash assignment writes strings — Sidekiq’s UI / third-party gems expect String tags.
- #valid? ⇒ Boolean
Constructor Details
#initialize(bid = nil) ⇒ Batch
Returns a new instance of Batch.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/wurk/batch.rb', line 62 def initialize(bid = nil) @bid = bid || SecureRandom.urlsafe_base64(BID_BYTES) @existing = !bid.nil? @description = nil @callback_queue = 'default' @callback_class = nil @tags = [] @autoflush = nil @linger = nil @parent_bid = nil @callbacks = [] @expires_in = DEFAULT_EXPIRY_SECONDS @mutable = !@existing @flushed_once = @existing load_existing! if @existing end |
Instance Attribute Details
#autoflush ⇒ Object
Returns the value of attribute autoflush.
55 56 57 |
# File 'lib/wurk/batch.rb', line 55 def autoflush @autoflush end |
#bid ⇒ Object (readonly)
Returns the value of attribute bid.
54 55 56 |
# File 'lib/wurk/batch.rb', line 54 def bid @bid end |
#callback_class ⇒ Object
Returns the value of attribute callback_class.
55 56 57 |
# File 'lib/wurk/batch.rb', line 55 def callback_class @callback_class end |
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
55 56 57 |
# File 'lib/wurk/batch.rb', line 55 def callback_queue @callback_queue end |
#description ⇒ Object
Returns the value of attribute description.
55 56 57 |
# File 'lib/wurk/batch.rb', line 55 def description @description end |
#linger ⇒ Object
Returns the value of attribute linger.
54 55 56 |
# File 'lib/wurk/batch.rb', line 54 def linger @linger end |
#parent_bid ⇒ Object (readonly)
Returns the value of attribute parent_bid.
54 55 56 |
# File 'lib/wurk/batch.rb', line 54 def parent_bid @parent_bid end |
Class Method Details
.keys_for(bid) ⇒ Object
57 58 59 60 |
# File 'lib/wurk/batch.rb', line 57 def self.keys_for(bid) base = "b-#{bid}" [base, *KEY_SUFFIXES.map { |s| "#{base}-#{s}" }] end |
Instance Method Details
#expires_in(duration) ⇒ Object
147 148 149 150 |
# File 'lib/wurk/batch.rb', line 147 def expires_in(duration) @expires_in = duration.to_i self end |
#include?(jid) ⇒ Boolean
112 113 114 |
# File 'lib/wurk/batch.rb', line 112 def include?(jid) Wurk.redis { |conn| conn.call('SISMEMBER', "b-#{@bid}-jids", jid) }.to_i.positive? end |
#invalidate_all ⇒ Object
Mark batch invalid. Pending jobs still exist in their queues; the server middleware short-circuits them when it observes the flag. Cascades to descendant batches via b-<bid>-kids.
134 135 136 137 |
# File 'lib/wurk/batch.rb', line 134 def invalidate_all cascade_invalidate(@bid) nil end |
#jobs(&block) ⇒ Object
Atomic enqueue block. Inside the block, ‘Job.perform_async` finds this batch via Thread.current and stamps `bid` onto the payload — the client middleware then uses BATCH_PUSH to register and push atomically. Empty blocks synthesise a Batch::Empty no-op so callbacks still fire.
169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/wurk/batch.rb', line 169 def jobs(&block) raise ArgumentError, 'jobs requires a block' unless block ensure_first_flush! pre_count = job_count collect_jobs(&block) # By the time we check, the buffer (if any) has flushed, so `total` # reflects everything the block pushed — a flat count is reliable. enqueue_empty_marker if job_count == pre_count @mutable = false self end |
#mutable? ⇒ Boolean
108 109 110 |
# File 'lib/wurk/batch.rb', line 108 def mutable? @mutable end |
#on(event, callback, options = {}) ⇒ Object
Register a callback. Multiple callbacks of the same event are allowed. The callback target may be a Class, “Foo#bar” string spec, or anything responding to ‘name`. `options` must be JSON-serializable.
155 156 157 158 159 160 161 162 |
# File 'lib/wurk/batch.rb', line 155 def on(event, callback, = {}) sym = event.to_sym raise ArgumentError, "invalid event #{event.inspect}" unless VALID_EVENTS.include?(sym) raise ArgumentError, 'callback options must be a Hash' unless .is_a?(Hash) @callbacks << [sym.to_s, callback_target(callback), ] self end |
#parent ⇒ Object
102 103 104 105 106 |
# File 'lib/wurk/batch.rb', line 102 def parent return nil if @parent_bid.nil? || @parent_bid.empty? Batch.new(@parent_bid) end |
#remove_jobs(*jids) ⇒ Object
Remove jobs from the batch. Decrements pending/total by exactly the count of jids actually removed (idempotent for repeated calls).
118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/wurk/batch.rb', line 118 def remove_jobs(*jids) return 0 if jids.empty? Wurk.redis do |conn| removed = conn.call('SREM', "b-#{@bid}-jids", *jids).to_i if removed.positive? conn.call('HINCRBY', "b-#{@bid}", 'pending', -removed) conn.call('HINCRBY', "b-#{@bid}", 'total', -removed) end removed end end |
#tags ⇒ Object
85 86 87 |
# File 'lib/wurk/batch.rb', line 85 def @tags.dup end |
#tags=(value) ⇒ Object
Hash assignment writes strings — Sidekiq’s UI / third-party gems expect String tags. Array-coercion lets callers pass a String or Set.
81 82 83 |
# File 'lib/wurk/batch.rb', line 81 def (value) @tags = Array(value).map(&:to_s) end |