Class: ActiveJob::Temporal::Workflows::AjWorkflow
- Inherits:
-
Temporalio::Workflow::Definition
- Object
- Temporalio::Workflow::Definition
- ActiveJob::Temporal::Workflows::AjWorkflow
- Includes:
- DeadLetterSupport, WorkflowChaining, WorkflowChildWorkflows, WorkflowContinueAsNew, WorkflowDependencies, WorkflowExecutionSteps, WorkflowInteractions, WorkflowLocalActivities, WorkflowNexus, WorkflowVersioning
- Defined in:
- lib/activejob/temporal/workflows/aj_workflow.rb
Overview
Workflow Determinism This workflow MUST remain deterministic. It contains no I/O operations, no random number generation, no system time calls (only ‘Workflow.now`), and no direct method calls to external services. All side effects occur in the activity layer. Temporal replays workflow code on every restart, so non-deterministic changes will cause workflow execution errors.
Non-Blocking Sleep The ‘Workflow.sleep` method uses Temporal’s durable timer mechanism. This means scheduled jobs do not consume worker resources while waiting. The workflow is persisted, and Temporal wakes it up at the scheduled time.
Deterministic orchestration workflow for ActiveJob execution.
This workflow serves as the durable scheduling and orchestration layer for ActiveJob. It handles delayed execution (via ‘Workflow.sleep`) and invokes the activity that executes the actual job logic.
Constant Summary collapse
- DEFAULT_START_TO_CLOSE_TIMEOUT =
900.0- RATE_LIMIT_ACTIVITY_TIMEOUT =
30.0- RATE_LIMIT_RETRY_POLICY =
Temporalio::RetryPolicy.new(max_attempts: 1)
Constants included from WorkflowVersioning
Constants included from WorkflowInteractions
WorkflowInteractions::HANDLER_NAME_PATTERN
Constants included from WorkflowDependencies
WorkflowDependencies::COMPLETED_DEPENDENCY_STATES, WorkflowDependencies::DEPENDENCY_CHECK_ACTIVITY_TIMEOUT, WorkflowDependencies::DEPENDENCY_CHECK_RETRY_POLICY, WorkflowDependencies::DEPENDENCY_NOT_FOUND_MAX_CHECKS, WorkflowDependencies::DEPENDENCY_WAIT_INTERVAL, WorkflowDependencies::FAILED_DEPENDENCY_STATES, WorkflowDependencies::NOT_FOUND_DEPENDENCY_STATES
Constants included from WorkflowChaining
WorkflowChaining::WORKFLOW_CONTROL_FIELDS
Instance Method Summary collapse
-
#execute(payload) ⇒ Object?
Executes the workflow: optionally sleeps until scheduled time, then runs the activity.
- #handle_dynamic_query(query_name, *args) ⇒ Object
- #handle_dynamic_signal(signal_name, *args) ⇒ Object
- #handle_dynamic_update(update_name, *args) ⇒ Object
Instance Method Details
#execute(payload) ⇒ Object?
Durable Timers If scheduled_at is in the future, the workflow creates a durable timer. The timer persists across worker restarts and does not block worker threads.
Durable Timer Guarantees Temporal’s durable timers persist across worker restarts and cluster outages. Even if all workers are down, the scheduled job will execute once workers are back online. The timer is stored in Temporal’s event history, making it highly reliable for long-term scheduling.
Executes the workflow: optionally sleeps until scheduled time, then runs the activity.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 153 def execute(payload) payload = schedule_execution_payload(payload) current_activity_payload = payload configure_workflow_state(payload) configure_workflow_interactions(payload) result = execute_workflow_steps(payload) do |chain_payload| current_activity_payload = chain_payload end workflow_state["phase"] = "completed" result rescue Temporalio::Error::ActivityError => e workflow_state["phase"] = "failed" if dead_letterable_failure?(current_activity_payload, e) start_dead_letter_workflow(current_activity_payload, e) end raise end |
#handle_dynamic_query(query_name, *args) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 86 def handle_dynamic_query(query_name, *args) handler_name = normalize_handler_name!(query_name, "query") case handler_name when "state" then deep_copy(workflow_state) when "paused" then workflow_state["paused"] when "pause_reason" then workflow_state["pause_reason"] when "phase" then workflow_state["phase"] when "signals" then deep_copy(workflow_state["signals"]) else dispatch_custom_query(handler_name, args) end end |
#handle_dynamic_signal(signal_name, *args) ⇒ Object
75 76 77 78 79 80 81 82 83 |
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 75 def handle_dynamic_signal(signal_name, *args) handler_name = normalize_handler_name!(signal_name, "signal") case handler_name when "pause" then pause_workflow(args) when "resume" then resume_workflow(args) else dispatch_custom_signal(handler_name, args) end end |
#handle_dynamic_update(update_name, *args) ⇒ Object
100 101 102 103 104 |
# File 'lib/activejob/temporal/workflows/aj_workflow.rb', line 100 def handle_dynamic_update(update_name, *args) handler_name = normalize_handler_name!(update_name, "update") dispatch_custom_update(handler_name, args) end |