Class: ActiveJob::Temporal::Activities::AjRunnerActivity
- Inherits:
-
Temporalio::Activity::Definition
- Object
- Temporalio::Activity::Definition
- ActiveJob::Temporal::Activities::AjRunnerActivity
- Defined in:
- lib/activejob/temporal/activities/aj_runner_activity.rb
Overview
Idempotency and Retries Activities may be re-executed on transient failures due to Temporal’s retry logic. Job implementations MUST be idempotent. This activity sets an execution-local idempotency key (‘Fiber`) derived from the workflow ID to assist with idempotent external operations (e.g., API requests).
Execution-Local Idempotency Key Jobs can access ‘Fiber` to generate unique idempotency tokens for external API calls. `Thread.current` remains populated for existing synchronous jobs. The key format is “workflow_id/runner” and persists across retries for the same workflow execution.
Exception Handling If the job raises an exception that matches a ‘discard_on` declaration, the activity raises a non-retryable `ApplicationError` to stop retries. Otherwise, the exception propagates and Temporal applies the retry policy.
Temporal activity that executes the actual ActiveJob logic.
This activity hydrates the job class, deserializes arguments, and invokes the job’s ‘perform` method. It is the only place where side effects occur (database writes, API calls, etc.).
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: RetryRequested
Constant Summary collapse
- IDEMPOTENCY_KEY =
:aj_temporal_idempotency_key- DESERIALIZATION_ERROR_CLASSES =
[ ActiveJob::SerializationError, ActiveJob::DeserializationError ].freeze
Instance Method Summary collapse
-
#execute(payload, raw_arguments = nil) ⇒ Object?
Executes the job inside the Temporal activity context.
Instance Method Details
#execute(payload, raw_arguments = nil) ⇒ Object?
Executes the job inside the Temporal activity context.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/activejob/temporal/activities/aj_runner_activity.rb', line 120 def execute(payload, raw_arguments = nil) job_class = nil audit_context = nil deserialized_payload = Payload.deserialize_payload( payload, encryption_context: activity_encryption_context(payload) ) apply_schedule_execution_identity(deserialized_payload) audit_context = audit_started(deserialized_payload) side_effects = BestEffortSideEffects.new(audit_context) result = perform_with_best_effort_observability(deserialized_payload, side_effects) do perform_deserialized_job(deserialized_payload, raw_arguments) do |resolved_job_class| job_class = resolved_job_class end end record_success_side_effects(payload, audit_context, side_effects) result rescue StandardError => e handle_activity_error(e, job_class, audit_context, deserialized_payload || payload) ensure clear_idempotency_key end |