Class: Wurk::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/batch.rb,
lib/wurk/batch_set.rb,
lib/wurk/batch/empty.rb,
lib/wurk/batch/buffer.rb,
lib/wurk/batch/status.rb,
lib/wurk/batch/callbacks.rb,
lib/wurk/batch/callback_job.rb,
lib/wurk/batch/death_handler.rb,
lib/wurk/batch/client_middleware.rb,
lib/wurk/batch/server_middleware.rb

Overview

Sidekiq Pro Batches. Group jobs, attach success/complete/death callbacks, track progress. Spec: docs/target/sidekiq-pro.md §2.

Lifecycle:

1. `Batch.new` allocates a fresh BID; the batch is `mutable?` until the
   first `#jobs` block flushes — that flush HSETs the core hash, ZADDs
   to `batches`, and writes tag indexes.
2. `Batch.new(bid)` reopens an existing batch (legal from inside a job
   or callback only). `mutable?` is false because reopening implies the
   first flush already happened.
3. `#jobs { ... }` collects `Job.perform_async` calls via the client
   middleware (Thread.current[:wurk_current_batch] is the signal),
   atomically registering each via BATCH_PUSH.
4. Workers ack on success → BATCH_ACK_SUCCESS → pending--.
   Death handler acks on permanent failure → BATCH_ACK_COMPLETE.
5. When live jids hits zero → fire `:complete`. When pending also
   hits zero with zero deaths → fire `:success`.

Nested batches: a job opening its OWN batch (‘batch.jobs { … }`) increments live counters on the existing batch. A callback opening its PARENT batch links via parent_bid and adds child BID to b-<bid>-kids.

Defined Under Namespace

Modules: Callbacks Classes: Buffer, CallbackJob, ClientMiddleware, DeadSet, DeathHandler, Empty, ServerMiddleware, Status

Constant Summary collapse

DEFAULT_EXPIRY_SECONDS =
30 * 24 * 60 * 60
POST_SUCCESS_EXPIRY_SECONDS =
24 * 60 * 60
CALLBACK_NOTIFY_TTL =
30 * 24 * 60 * 60
BID_BYTES =

Bid is URL-safe base64 of 10 random bytes — matches Sidekiq Pro’s BID generator. Length matters: third-party gems that key off bid prefix (sharded batches in Pro 8) inspect the first character.

10
VALID_EVENTS =
%i[success complete death].freeze
KEY_SUFFIXES =

The ‘live’ set tracks jobs that have not yet reached a terminal state. When it’s empty, every job has either succeeded or died → ‘:complete` is allowed to fire.

%w[jids failed died notify cbsucc kids pkids tags].freeze
THREAD_KEY =
:wurk_current_batch
BUFFER_KEY =

Set on the current thread (to a Buffer) only inside an autoflush ‘#jobs` block. Client#raw_push reads it: when present, batched pushes accumulate here instead of round-tripping per job.

:wurk_batch_buffer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bid = nil) ⇒ Batch

Returns a new instance of Batch.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/wurk/batch.rb', line 62

def initialize(bid = nil)
  @bid              = bid || SecureRandom.urlsafe_base64(BID_BYTES)
  @existing         = !bid.nil?
  @description      = nil
  @callback_queue   = 'default'
  @callback_class   = nil
  @tags             = []
  @autoflush        = nil
  @linger           = nil
  @parent_bid       = nil
  @callbacks        = []
  @expires_in       = DEFAULT_EXPIRY_SECONDS
  @mutable          = !@existing
  @flushed_once     = @existing
  load_existing! if @existing
end

Instance Attribute Details

#autoflushObject

Returns the value of attribute autoflush.



55
56
57
# File 'lib/wurk/batch.rb', line 55

def autoflush
  @autoflush
end

#bidObject (readonly)

Returns the value of attribute bid.



54
55
56
# File 'lib/wurk/batch.rb', line 54

def bid
  @bid
end

#callback_classObject

Returns the value of attribute callback_class.



55
56
57
# File 'lib/wurk/batch.rb', line 55

def callback_class
  @callback_class
end

#callback_queueObject

Returns the value of attribute callback_queue.



55
56
57
# File 'lib/wurk/batch.rb', line 55

def callback_queue
  @callback_queue
end

#descriptionObject

Returns the value of attribute description.



55
56
57
# File 'lib/wurk/batch.rb', line 55

def description
  @description
end

#lingerObject

Returns the value of attribute linger.



54
55
56
# File 'lib/wurk/batch.rb', line 54

def linger
  @linger
end

#parent_bidObject (readonly)

Returns the value of attribute parent_bid.



54
55
56
# File 'lib/wurk/batch.rb', line 54

def parent_bid
  @parent_bid
end

Class Method Details

.keys_for(bid) ⇒ Object



57
58
59
60
# File 'lib/wurk/batch.rb', line 57

def self.keys_for(bid)
  base = "b-#{bid}"
  [base, *KEY_SUFFIXES.map { |s| "#{base}-#{s}" }]
end

Instance Method Details

#expires_in(duration) ⇒ Object



147
148
149
150
# File 'lib/wurk/batch.rb', line 147

def expires_in(duration)
  @expires_in = duration.to_i
  self
end

#include?(jid) ⇒ Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/wurk/batch.rb', line 112

def include?(jid)
  Wurk.redis { |conn| conn.call('SISMEMBER', "b-#{@bid}-jids", jid) }.to_i.positive?
end

#invalidate_allObject

Mark batch invalid. Pending jobs still exist in their queues; the server middleware short-circuits them when it observes the flag. Cascades to descendant batches via b-<bid>-kids.



134
135
136
137
# File 'lib/wurk/batch.rb', line 134

def invalidate_all
  cascade_invalidate(@bid)
  nil
end

#jobs(&block) ⇒ Object

Atomic enqueue block. Inside the block, ‘Job.perform_async` finds this batch via Thread.current and stamps `bid` onto the payload — the client middleware then uses BATCH_PUSH to register and push atomically. Empty blocks synthesise a Batch::Empty no-op so callbacks still fire.

Raises:

  • (ArgumentError)


169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/wurk/batch.rb', line 169

def jobs(&block)
  raise ArgumentError, 'jobs requires a block' unless block

  ensure_first_flush!
  pre_count = job_count
  collect_jobs(&block)
  # By the time we check, the buffer (if any) has flushed, so `total`
  # reflects everything the block pushed — a flat count is reliable.
  enqueue_empty_marker if job_count == pre_count
  @mutable = false
  self
end

#mutable?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/wurk/batch.rb', line 108

def mutable?
  @mutable
end

#on(event, callback, options = {}) ⇒ Object

Register a callback. Multiple callbacks of the same event are allowed. The callback target may be a Class, “Foo#bar” string spec, or anything responding to ‘name`. `options` must be JSON-serializable.

Raises:

  • (ArgumentError)


155
156
157
158
159
160
161
162
# File 'lib/wurk/batch.rb', line 155

def on(event, callback, options = {})
  sym = event.to_sym
  raise ArgumentError, "invalid event #{event.inspect}" unless VALID_EVENTS.include?(sym)
  raise ArgumentError, 'callback options must be a Hash' unless options.is_a?(Hash)

  @callbacks << [sym.to_s, callback_target(callback), options]
  self
end

#parentObject



102
103
104
105
106
# File 'lib/wurk/batch.rb', line 102

def parent
  return nil if @parent_bid.nil? || @parent_bid.empty?

  Batch.new(@parent_bid)
end

#remove_jobs(*jids) ⇒ Object

Remove jobs from the batch. Decrements pending/total by exactly the count of jids actually removed (idempotent for repeated calls).



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/wurk/batch.rb', line 118

def remove_jobs(*jids)
  return 0 if jids.empty?

  Wurk.redis do |conn|
    removed = conn.call('SREM', "b-#{@bid}-jids", *jids).to_i
    if removed.positive?
      conn.call('HINCRBY', "b-#{@bid}", 'pending', -removed)
      conn.call('HINCRBY', "b-#{@bid}", 'total', -removed)
    end
    removed
  end
end

#statusObject



143
144
145
# File 'lib/wurk/batch.rb', line 143

def status
  Status.new(@bid)
end

#tagsObject



85
86
87
# File 'lib/wurk/batch.rb', line 85

def tags
  @tags.dup
end

#tags=(value) ⇒ Object

Hash assignment writes strings — Sidekiq’s UI / third-party gems expect String tags. Array-coercion lets callers pass a String or Set.



81
82
83
# File 'lib/wurk/batch.rb', line 81

def tags=(value)
  @tags = Array(value).map(&:to_s)
end

#valid?Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/wurk/batch.rb', line 139

def valid?
  Wurk.redis { |conn| conn.call('HGET', "b-#{@bid}", 'invalidated') } != '1'
end