Class: ActiveJob::QueueAdapters::TemporalAdapter
- Inherits:
-
AbstractAdapter
- Object
- AbstractAdapter
- ActiveJob::QueueAdapters::TemporalAdapter
- Defined in:
- lib/activejob/temporal/adapter.rb
Overview
Idempotent Enqueuing Jobs with the same job_id will not be enqueued twice. The adapter uses FAIL conflict policy, so duplicate enqueue attempts surface as DuplicateEnqueueError through ActiveJob’s enqueue status.
Transaction Safety Jobs using the Temporal adapter are opted into ActiveJob’s ‘enqueue_after_transaction_commit` setting. This defers workflow starts until the current database transaction commits and prevents workflows from starting for rolled-back jobs.
ActiveJob queue adapter for Temporal workflows.
This adapter integrates ActiveJob with Temporal by starting workflows for each enqueued job. It translates ActiveJob’s ‘perform_later` and `set(wait:).perform_later` into Temporal workflow starts with the AjWorkflow.
Instance Attribute Summary collapse
-
#enqueuer ⇒ WorkflowEnqueuer
readonly
The enqueuer service.
Instance Method Summary collapse
-
#enqueue(job) ⇒ Object
Enqueues a job for immediate execution on Temporal by starting the AjWorkflow.
-
#enqueue_after_transaction_commit? ⇒ Boolean
Signals transaction-aware ActiveJob versions to defer enqueuing until after commit.
-
#enqueue_at(job, timestamp) ⇒ Object
Enqueues a job for execution at a specific time by starting the AjWorkflow immediately.
-
#initialize ⇒ TemporalAdapter
constructor
Initialize the adapter with a WorkflowEnqueuer service instance.
Constructor Details
#initialize ⇒ TemporalAdapter
Initialize the adapter with a WorkflowEnqueuer service instance.
126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/activejob/temporal/adapter.rb', line 126 def initialize super config = ActiveJob::Temporal.config logger = config.logger @enqueuer = ActiveJob::Temporal::WorkflowEnqueuer.new( -> { ActiveJob::Temporal.client }, config, logger ) end |
Instance Attribute Details
#enqueuer ⇒ WorkflowEnqueuer (readonly)
Returns the enqueuer service.
123 124 125 |
# File 'lib/activejob/temporal/adapter.rb', line 123 def enqueuer @enqueuer end |
Instance Method Details
#enqueue(job) ⇒ Object
FAIL Conflict Policy Duplicate job_id values raise DuplicateEnqueueError. ActiveJob catches this as an enqueue failure, so ‘perform_later` returns false and the yielded job exposes the error through `enqueue_error`.
Enqueues a job for immediate execution on Temporal by starting the AjWorkflow.
Delegates to the WorkflowEnqueuer service to handle the mechanics of workflow creation and startup.
188 189 190 |
# File 'lib/activejob/temporal/adapter.rb', line 188 def enqueue(job) @enqueuer.enqueue(job) end |
#enqueue_after_transaction_commit? ⇒ Boolean
Signals transaction-aware ActiveJob versions to defer enqueuing until after commit.
Rails 8 uses the job class ‘enqueue_after_transaction_commit` setting instead. The TransactionSafety hook enables that setting when a job selects the Temporal adapter. This method remains for adapter-contract compatibility with older ActiveJob behavior.
252 253 254 |
# File 'lib/activejob/temporal/adapter.rb', line 252 def enqueue_after_transaction_commit? true end |
#enqueue_at(job, timestamp) ⇒ Object
Non-Blocking Sleep The workflow uses Temporal’s durable timer mechanism, so scheduled jobs do not consume worker resources while waiting.
Enqueues a job for execution at a specific time by starting the AjWorkflow immediately.
The workflow starts immediately but sleeps (non-blockingly) until the scheduled time before executing the activity. This leverages Temporal’s durable timers.
227 228 229 230 |
# File 'lib/activejob/temporal/adapter.rb', line 227 def enqueue_at(job, ) scheduled_time = Time.at() @enqueuer.enqueue(job, scheduled_at: scheduled_time) end |