Class: ActiveJob::Temporal::Activities::AjRunnerActivity

Inherits:
Temporalio::Activity::Definition
  • Object
show all
Defined in:
lib/activejob/temporal/activities/aj_runner_activity.rb

Overview

Note:

Idempotency and Retries Activities may be re-executed on transient failures due to Temporal’s retry logic. Job implementations MUST be idempotent. This activity sets an execution-local idempotency key (‘Fiber`) derived from the workflow ID to assist with idempotent external operations (e.g., API requests).

Note:

Execution-Local Idempotency Key Jobs can access ‘Fiber` to generate unique idempotency tokens for external API calls. `Thread.current` remains populated for existing synchronous jobs. The key format is “workflow_id/runner” and persists across retries for the same workflow execution.

Note:

Exception Handling If the job raises an exception that matches a ‘discard_on` declaration, the activity raises a non-retryable `ApplicationError` to stop retries. Otherwise, the exception propagates and Temporal applies the retry policy.

Temporal activity that executes the actual ActiveJob logic.

This activity hydrates the job class, deserializes arguments, and invokes the job’s ‘perform` method. It is the only place where side effects occur (database writes, API calls, etc.).

rubocop:disable Metrics/ClassLength

Examples:

Activity execution flow

1. Deserialize job arguments from payload
2. Constantize job class
3. Set execution-local idempotency key
4. Instantiate job and call `perform(*args)`
5. Handle exceptions (discard vs. retry)
6. Clear idempotency key

Using idempotency key in a job

class ChargeCustomerJob < ApplicationJob
  def perform(customer_id, amount)
    idempotency_key = Fiber[:aj_temporal_idempotency_key]
    StripeAPI.charge(
      customer_id: customer_id,
      amount: amount,
      idempotency_key: idempotency_key
    )
  end
end

See Also:

Defined Under Namespace

Classes: RetryRequested

Constant Summary collapse

IDEMPOTENCY_KEY =
:aj_temporal_idempotency_key
DESERIALIZATION_ERROR_CLASSES =
[
  ActiveJob::SerializationError,
  ActiveJob::DeserializationError
].freeze

Instance Method Summary collapse

Instance Method Details

#execute(payload, raw_arguments = nil) ⇒ Object?

Executes the job inside the Temporal activity context.

Examples:

Basic execution

execute({
  job_class: "MyJob",
  job_id: "123",
  arguments: [{ "_aj_serialized" => "ActiveJob::Serializers::ObjectSerializer", "value" => {...} }]
})

Accessing idempotency key in job

class MyJob < ApplicationJob
  def perform(user_id)
    key = Fiber[:aj_temporal_idempotency_key]
    ExternalAPI.create_user(user_id, idempotency_key: key)
  end
end

Handling discard_on exceptions

class MyJob < ApplicationJob
  discard_on ActiveRecord::RecordNotFound
  def perform(user_id)
    User.find(user_id).do_something
  end
end
# If RecordNotFound is raised, activity raises non-retryable ApplicationError

Parameters:

  • payload (Hash)

    Job payload with serialized arguments and metadata

Options Hash (payload):

  • :job_class (String)

    Fully-qualified job class name (required)

  • :job_id (String)

    Unique job identifier

  • :arguments (Array)

    Serialized job arguments (via ActiveJob::Arguments)

  • :queue_name (String)

    Target queue name

  • :executions (Integer)

    Current execution count

  • :exception_executions (Hash)

    Exception execution counts

Returns:

  • (Object, nil)

    Result of the job’s ‘perform` method (typically nil)

Raises:

  • (ArgumentError)

    if payload is missing job_class

  • (NameError)

    if job_class cannot be constantized

  • (ActiveJob::SerializationError)

    if arguments cannot be deserialized

  • (Temporalio::Error::ApplicationError)

    if job raises a discardable exception (non-retryable)

  • (StandardError)

    if job raises a retryable exception (propagates to Temporal)



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/activejob/temporal/activities/aj_runner_activity.rb', line 120

def execute(payload, raw_arguments = nil)
  job_class = nil
  audit_context = nil
  deserialized_payload = Payload.deserialize_payload(
    payload,
    encryption_context: activity_encryption_context(payload)
  )
  apply_schedule_execution_identity(deserialized_payload)
  audit_context = audit_started(deserialized_payload)

  side_effects = BestEffortSideEffects.new(audit_context)
  result = perform_with_best_effort_observability(deserialized_payload, side_effects) do
    perform_deserialized_job(deserialized_payload, raw_arguments) do |resolved_job_class|
      job_class = resolved_job_class
    end
  end
  record_success_side_effects(payload, audit_context, side_effects)
  result
rescue StandardError => e
  handle_activity_error(e, job_class, audit_context, deserialized_payload || payload)
ensure
  clear_idempotency_key
end