Class: SidekiqBatch::BatchEnrollmentContext

Inherits:
Object
  • Object
show all
Defined in:
app/models/sidekiq_batch/batch_enrollment_context.rb

Overview

Scopes a block of ‘perform_async` calls to a specific SidekiqBatch. Active only on the enrolling thread via a thread-local reference; child jobs spawned at execution time from tracked workers are pass-through (by design).

Enrollment itself is performed by ‘SidekiqBatch::ClientMiddleware`, which looks up the thread-local context during each client push and writes a `SidekiqBatchJob` row BEFORE Sidekiq’s ‘raw_push` sends the job to Redis. The client middleware also sits outermost in the chain so that dedupe/suppression middleware earlier-added get the final say on whether a payload should actually be enrolled.

Defined Under Namespace

Classes: EmptyEnrollmentError, Error, NestedError, TransactionError

Constant Summary collapse

THREAD_KEY =
:sidekiq_batch_enrollment_context
TXN_BASELINE =
:sidekiq_batch_enrollment_txn_baseline

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(batch) ⇒ BatchEnrollmentContext

Returns a new instance of BatchEnrollmentContext.



38
39
40
41
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 38

def initialize(batch)
  @batch          = batch
  @inserted_count = 0
end

Instance Attribute Details

#batchObject (readonly)

Returns the value of attribute batch.



43
44
45
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 43

def batch
  @batch
end

Class Method Details

.currentObject



27
28
29
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 27

def self.current
  Thread.current[THREAD_KEY]
end

.transaction_baselineObject

Specs running inside a fixture transaction set this to the open-txn count at the start of each example; anything above the baseline is a caller-opened transaction and is rejected.



34
35
36
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 34

def self.transaction_baseline
  Thread.current[TXN_BASELINE] || 0
end

Instance Method Details

#enroll(payload) ⇒ Object

Called by ClientMiddleware after downstream middleware has confirmed the job will be pushed. Insert is committed immediately (no surrounding transaction) so Sidekiq’s subsequent raw_push hands off a visible row.



74
75
76
77
78
79
80
81
82
83
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 74

def enroll(payload)
  ::SidekiqBatchJob.create!(
    sidekiq_batch_id: @batch.id,
    jid:              payload.fetch("jid"),
    worker_class:     payload.fetch("class"),
    args:             payload.fetch("args", []),
    status:           "pending"
  )
  @inserted_count += 1
end

#runObject

rubocop:disable Metrics/AbcSize, Metrics/MethodLength



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 45

def run # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
  raise NestedError, "jobs {} is already active on this thread" if self.class.current
  # NOTE: We can't allow the context to work whilst wrapped by a transaction.
  # For the sake of tracking progress correctly we need the state of Redis and the DB to be one to one
  raise TransactionError, "jobs {} cannot be called inside an open ActiveRecord transaction" if in_open_transaction?

  Thread.current[THREAD_KEY] = self

  yield

  if @inserted_count.zero?
    # Empty block means this batch will never have work to complete.
    # Destroy the batch row so callers who rescue the error don't leave
    # orphan `pending` rows lying around forever.
    @batch.destroy
    raise EmptyEnrollmentError, "jobs {} block enrolled zero jobs"
  end

  actual_total = @batch.sidekiq_batch_jobs.count

  @batch.update!(total_jobs: actual_total, status: "running")
  @batch.attempt_completion!
ensure
  Thread.current[THREAD_KEY] = nil
end