Class: ActiveJob::QueueAdapters::TemporalAdapter

Inherits:
AbstractAdapter
  • Object
show all
Defined in:
lib/activejob/temporal/adapter.rb

Overview

Note:

Idempotent Enqueuing Jobs with the same job_id will not be enqueued twice. The adapter uses FAIL conflict policy, so duplicate enqueue attempts surface as DuplicateEnqueueError through ActiveJob’s enqueue status.

Note:

Transaction Safety Jobs using the Temporal adapter are opted into ActiveJob’s ‘enqueue_after_transaction_commit` setting. This defers workflow starts until the current database transaction commits and prevents workflows from starting for rolled-back jobs.

ActiveJob queue adapter for Temporal workflows.

This adapter integrates ActiveJob with Temporal by starting workflows for each enqueued job. It translates ActiveJob’s ‘perform_later` and `set(wait:).perform_later` into Temporal workflow starts with the AjWorkflow.

Examples:

Basic usage

class MyJob < ApplicationJob
  self.queue_adapter = :temporal
  def perform(arg)
    # job logic
  end
end
MyJob.perform_later("arg")

Scheduled job

MyJob.set(wait: 1.hour).perform_later("arg")

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTemporalAdapter

Initialize the adapter with a WorkflowEnqueuer service instance.



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/activejob/temporal/adapter.rb', line 126

def initialize
  super

  config = ActiveJob::Temporal.config
  logger = config.logger

  @enqueuer = ActiveJob::Temporal::WorkflowEnqueuer.new(
    -> { ActiveJob::Temporal.client },
    config,
    logger
  )
end

Instance Attribute Details

#enqueuerWorkflowEnqueuer (readonly)

Returns the enqueuer service.

Returns:

  • (WorkflowEnqueuer)

    the enqueuer service



123
124
125
# File 'lib/activejob/temporal/adapter.rb', line 123

def enqueuer
  @enqueuer
end

Instance Method Details

#enqueue(job) ⇒ Object

Note:

FAIL Conflict Policy Duplicate job_id values raise DuplicateEnqueueError. ActiveJob catches this as an enqueue failure, so ‘perform_later` returns false and the yielded job exposes the error through `enqueue_error`.

Enqueues a job for immediate execution on Temporal by starting the AjWorkflow.

Delegates to the WorkflowEnqueuer service to handle the mechanics of workflow creation and startup.

Examples:

Basic usage

adapter = TemporalAdapter.new
job = MyJob.new("arg1", "arg2")
adapter.enqueue(job)

Handling serialization errors

begin
  MyJob.perform_later(huge_object)
rescue ActiveJob::SerializationError => e
  Rails.logger.error("Payload too large: #{e.message}")
  MyJob.perform_later(huge_object.id)  # Pass ID instead
end

Handling configuration errors

begin
  MyJob.perform_later("arg")
rescue ActiveJob::Temporal::ConfigurationError => e
  # Configuration validation failed
  Rails.logger.fatal("Invalid Temporal configuration: #{e.message}")
end

Handling enqueue errors (cluster unreachable)

begin
  MyJob.perform_later("arg")
rescue ActiveJob::EnqueueError => e
  # Temporal cluster is down or network issue
  Rails.logger.error("Cannot enqueue job: #{e.message}")
  # Consider queuing to fallback system or retrying later
end

Parameters:

  • job (ActiveJob::Base)

    the job instance provided by ActiveJob

Returns:

  • (Object)

    workflow run handle if provided by Temporal SDK

Raises:

  • (ActiveJob::SerializationError)

    if payload serialization fails or exceeds max_payload_size_kb

  • (ActiveJob::EnqueueError)

    if the Temporal client cannot start the workflow

  • (ActiveJob::Temporal::ConfigurationError)

    if configuration is invalid

See Also:



188
189
190
# File 'lib/activejob/temporal/adapter.rb', line 188

def enqueue(job)
  @enqueuer.enqueue(job)
end

#enqueue_after_transaction_commit?Boolean

Signals transaction-aware ActiveJob versions to defer enqueuing until after commit.

Rails 8 uses the job class ‘enqueue_after_transaction_commit` setting instead. The TransactionSafety hook enables that setting when a job selects the Temporal adapter. This method remains for adapter-contract compatibility with older ActiveJob behavior.

Examples:

Transaction-safe enqueuing

ActiveRecord::Base.transaction do
  user = User.create!(name: "Alice")
  MyJob.perform_later(user) # Deferred until commit
  raise ActiveRecord::Rollback # Job is NOT enqueued
end

Ensuring job runs after DB commit

ActiveRecord::Base.transaction do
  order = Order.create!(amount: 100)
  PaymentJob.perform_later(order.id)
  # Job will not start until transaction commits successfully
end

Returns:

  • (Boolean)

    always returns true



252
253
254
# File 'lib/activejob/temporal/adapter.rb', line 252

def enqueue_after_transaction_commit?
  true
end

#enqueue_at(job, timestamp) ⇒ Object

Note:

Non-Blocking Sleep The workflow uses Temporal’s durable timer mechanism, so scheduled jobs do not consume worker resources while waiting.

Enqueues a job for execution at a specific time by starting the AjWorkflow immediately.

The workflow starts immediately but sleeps (non-blockingly) until the scheduled time before executing the activity. This leverages Temporal’s durable timers.

Examples:

Basic usage

adapter = TemporalAdapter.new
job = MyJob.new("arg")
adapter.enqueue_at(job, 1.hour.from_now.to_i)

Scheduling with ActiveJob DSL

MyJob.set(wait: 1.hour).perform_later("arg")

Scheduling with wait_until

MyJob.set(wait_until: Date.tomorrow.noon).perform_later("arg")

Far-future scheduling (durable timer benefits)

# Schedule a job 30 days in the future
MyJob.set(wait: 30.days).perform_later("reminder", user_id: 123)
# The workflow sleeps for 30 days without consuming resources

Parameters:

  • job (ActiveJob::Base)

    the job instance provided by ActiveJob

  • timestamp (Integer, Float)

    UNIX timestamp when the job should be executed

Returns:

  • (Object)

    workflow run handle if provided by Temporal SDK

Raises:

  • (ActiveJob::SerializationError)

    if payload serialization fails or exceeds max_payload_size_kb

  • (ActiveJob::EnqueueError)

    if the Temporal client cannot start the workflow

  • (ActiveJob::Temporal::ConfigurationError)

    if configuration is invalid

See Also:

  • #enqueue
  • Workflows::AjWorkflow#sleep_until


227
228
229
230
# File 'lib/activejob/temporal/adapter.rb', line 227

def enqueue_at(job, timestamp)
  scheduled_time = Time.at(timestamp)
  @enqueuer.enqueue(job, scheduled_at: scheduled_time)
end