Class: SidekiqBatch::BatchEnrollmentContext
- Inherits:
-
Object
- Object
- SidekiqBatch::BatchEnrollmentContext
- Defined in:
- app/models/sidekiq_batch/batch_enrollment_context.rb
Overview
Scopes a block of ‘perform_async` calls to a specific SidekiqBatch. Active only on the enrolling thread via a thread-local reference; child jobs spawned at execution time from tracked workers are pass-through (by design).
Enrollment itself is performed by ‘SidekiqBatch::ClientMiddleware`, which looks up the thread-local context during each client push and writes a `SidekiqBatchJob` row BEFORE Sidekiq’s ‘raw_push` sends the job to Redis. The client middleware also sits outermost in the chain so that dedupe/suppression middleware earlier-added get the final say on whether a payload should actually be enrolled.
Defined Under Namespace
Classes: EmptyEnrollmentError, Error, NestedError, TransactionError
Constant Summary collapse
- THREAD_KEY =
:sidekiq_batch_enrollment_context- TXN_BASELINE =
:sidekiq_batch_enrollment_txn_baseline
Instance Attribute Summary collapse
-
#batch ⇒ Object
readonly
Returns the value of attribute batch.
Class Method Summary collapse
- .current ⇒ Object
-
.transaction_baseline ⇒ Object
Specs running inside a fixture transaction set this to the open-txn count at the start of each example; anything above the baseline is a caller-opened transaction and is rejected.
Instance Method Summary collapse
-
#enroll(payload) ⇒ Object
Called by ClientMiddleware after downstream middleware has confirmed the job will be pushed.
-
#initialize(batch) ⇒ BatchEnrollmentContext
constructor
A new instance of BatchEnrollmentContext.
-
#run ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/MethodLength.
Constructor Details
#initialize(batch) ⇒ BatchEnrollmentContext
Returns a new instance of BatchEnrollmentContext.
38 39 40 41 |
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 38 def initialize(batch) @batch = batch @inserted_count = 0 end |
Instance Attribute Details
#batch ⇒ Object (readonly)
Returns the value of attribute batch.
43 44 45 |
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 43 def batch @batch end |
Class Method Details
.current ⇒ Object
27 28 29 |
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 27 def self.current Thread.current[THREAD_KEY] end |
.transaction_baseline ⇒ Object
Specs running inside a fixture transaction set this to the open-txn count at the start of each example; anything above the baseline is a caller-opened transaction and is rejected.
34 35 36 |
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 34 def self.transaction_baseline Thread.current[TXN_BASELINE] || 0 end |
Instance Method Details
#enroll(payload) ⇒ Object
Called by ClientMiddleware after downstream middleware has confirmed the job will be pushed. Insert is committed immediately (no surrounding transaction) so Sidekiq’s subsequent raw_push hands off a visible row.
74 75 76 77 78 79 80 81 82 83 |
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 74 def enroll(payload) ::SidekiqBatchJob.create!( sidekiq_batch_id: @batch.id, jid: payload.fetch("jid"), worker_class: payload.fetch("class"), args: payload.fetch("args", []), status: "pending" ) @inserted_count += 1 end |
#run ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/MethodLength
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'app/models/sidekiq_batch/batch_enrollment_context.rb', line 45 def run # rubocop:disable Metrics/AbcSize, Metrics/MethodLength raise NestedError, "jobs {} is already active on this thread" if self.class.current # NOTE: We can't allow the context to work whilst wrapped by a transaction. # For the sake of tracking progress correctly we need the state of Redis and the DB to be one to one raise TransactionError, "jobs {} cannot be called inside an open ActiveRecord transaction" if in_open_transaction? Thread.current[THREAD_KEY] = self yield if @inserted_count.zero? # Empty block means this batch will never have work to complete. # Destroy the batch row so callers who rescue the error don't leave # orphan `pending` rows lying around forever. @batch.destroy raise EmptyEnrollmentError, "jobs {} block enrolled zero jobs" end actual_total = @batch.sidekiq_batch_jobs.count @batch.update!(total_jobs: actual_total, status: "running") @batch.attempt_completion! ensure Thread.current[THREAD_KEY] = nil end |