Class: ActiveJob::Temporal::Workflows::AjWorkflow

Inherits:
Temporalio::Workflow::Definition
  • Object
show all
Includes:
DeadLetterSupport, WorkflowChaining, WorkflowChildWorkflows, WorkflowContinueAsNew, WorkflowDependencies, WorkflowExecutionSteps, WorkflowInteractions, WorkflowLocalActivities, WorkflowNexus, WorkflowVersioning
Defined in:
lib/activejob/temporal/workflows/aj_workflow.rb

Overview

Note:

Workflow Determinism This workflow MUST remain deterministic. It contains no I/O operations, no random number generation, no system time calls (only ‘Workflow.now`), and no direct method calls to external services. All side effects occur in the activity layer. Temporal replays workflow code on every restart, so non-deterministic changes will cause workflow execution errors.

Note:

Non-Blocking Sleep The ‘Workflow.sleep` method uses Temporal’s durable timer mechanism. This means scheduled jobs do not consume worker resources while waiting. The workflow is persisted, and Temporal wakes it up at the scheduled time.

Deterministic orchestration workflow for ActiveJob execution.

This workflow serves as the durable scheduling and orchestration layer for ActiveJob. It handles delayed execution (via ‘Workflow.sleep`) and invokes the activity that executes the actual job logic.

Examples:

Workflow execution flow

1. Extract scheduled_at timestamp from payload
2. If scheduled_at is in the future, sleep until that time (non-blocking)
3. Execute AjRunnerActivity with the payload and retry policy
4. Return activity result

Replay behavior

# If a worker crashes during step 3, Temporal will replay the workflow:
# - Step 1: Re-reads scheduled_at (deterministic)
# - Step 2: Skips sleep (already elapsed, replayed from history)
# - Step 3: Continues from last checkpoint

See Also:

Constant Summary collapse

DEFAULT_START_TO_CLOSE_TIMEOUT =
900.0
RATE_LIMIT_ACTIVITY_TIMEOUT =
30.0
RATE_LIMIT_RETRY_POLICY =
Temporalio::RetryPolicy.new(max_attempts: 1)

Constants included from WorkflowVersioning

WorkflowVersioning::PATCHES

Constants included from WorkflowInteractions

WorkflowInteractions::HANDLER_NAME_PATTERN

Constants included from WorkflowDependencies

WorkflowDependencies::COMPLETED_DEPENDENCY_STATES, WorkflowDependencies::DEPENDENCY_CHECK_ACTIVITY_TIMEOUT, WorkflowDependencies::DEPENDENCY_CHECK_RETRY_POLICY, WorkflowDependencies::DEPENDENCY_NOT_FOUND_MAX_CHECKS, WorkflowDependencies::DEPENDENCY_WAIT_INTERVAL, WorkflowDependencies::FAILED_DEPENDENCY_STATES, WorkflowDependencies::NOT_FOUND_DEPENDENCY_STATES

Constants included from WorkflowChaining

WorkflowChaining::WORKFLOW_CONTROL_FIELDS

Instance Method Summary collapse

Instance Method Details

#execute(payload) ⇒ Object?

Note:

Durable Timers If scheduled_at is in the future, the workflow creates a durable timer. The timer persists across worker restarts and does not block worker threads.

Note:

Durable Timer Guarantees Temporal’s durable timers persist across worker restarts and cluster outages. Even if all workers are down, the scheduled job will execute once workers are back online. The timer is stored in Temporal’s event history, making it highly reliable for long-term scheduling.

Executes the workflow: optionally sleeps until scheduled time, then runs the activity.

Examples:

Immediate execution

execute({ job_class: "MyJob", job_id: "123", arguments: ["arg1"] })

Scheduled execution (non-blocking sleep)

execute({
  job_class: "MyJob",
  job_id: "123",
  scheduled_at: "2025-10-31T12:00:00Z",
  arguments: []
})
# Workflow sleeps until scheduled time without consuming worker resources

Replay behavior on worker restart

# Initial execution: Workflow sleeps for 1 hour
# Worker crashes after 30 minutes
# Workflow is replayed: Sleep is skipped (already elapsed), activity executes immediately

Parameters:

  • payload (Hash)

    Job payload containing execution metadata

Options Hash (payload):

  • :job_class (String)

    Fully-qualified job class name (required)

  • :job_id (String)

    Unique job identifier (required)

  • :queue_name (String)

    Target queue name (required)

  • :arguments (Array)

    Serialized job arguments (required)

  • :default_activity_options (Hash)

    Global activity timeout defaults (required)

  • :retry_policy (Hash)

    Retry policy for activity execution (required)

  • :temporal_options (Hash)

    Per-job timeout configuration (optional)

  • :scheduled_at (String)

    ISO8601 timestamp for delayed execution (optional)

  • :executions (Integer)

    Current execution count (default: 0)

  • :exception_executions (Hash)

    Exception execution counts (default: {})

Returns:

  • (Object, nil)

    Result from the activity execution

Raises:

  • (Temporalio::Error::ActivityError)

    if activity execution fails (propagates from activity)

  • (Temporalio::Error::TimeoutError)

    if activity exceeds start_to_close_timeout

See Also:



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 153

def execute(payload)
  payload = schedule_execution_payload(payload)
  current_activity_payload = payload
  configure_workflow_state(payload)
  configure_workflow_interactions(payload)

  result = execute_workflow_steps(payload) do |chain_payload|
    current_activity_payload = chain_payload
  end

  workflow_state["phase"] = "completed"
  result
rescue Temporalio::Error::ActivityError => e
  workflow_state["phase"] = "failed"
  if dead_letterable_failure?(current_activity_payload, e)
    start_dead_letter_workflow(current_activity_payload, e)
  end
  raise
end

#handle_dynamic_query(query_name, *args) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 86

def handle_dynamic_query(query_name, *args)
  handler_name = normalize_handler_name!(query_name, "query")

  case handler_name
  when "state" then deep_copy(workflow_state)
  when "paused" then workflow_state["paused"]
  when "pause_reason" then workflow_state["pause_reason"]
  when "phase" then workflow_state["phase"]
  when "signals" then deep_copy(workflow_state["signals"])
  else dispatch_custom_query(handler_name, args)
  end
end

#handle_dynamic_signal(signal_name, *args) ⇒ Object



75
76
77
78
79
80
81
82
83
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 75

def handle_dynamic_signal(signal_name, *args)
  handler_name = normalize_handler_name!(signal_name, "signal")

  case handler_name
  when "pause" then pause_workflow(args)
  when "resume" then resume_workflow(args)
  else dispatch_custom_signal(handler_name, args)
  end
end

#handle_dynamic_update(update_name, *args) ⇒ Object



100
101
102
103
104
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 100

def handle_dynamic_update(update_name, *args)
  handler_name = normalize_handler_name!(update_name, "update")

  dispatch_custom_update(handler_name, args)
end