Module: ActiveJob::Temporal
- Extended by:
- Configurable
- Defined in:
- lib/activejob/temporal.rb,
lib/activejob/temporal/cancel.rb,
lib/activejob/temporal/client.rb,
lib/activejob/temporal/logger.rb,
lib/activejob/temporal/adapter.rb,
lib/activejob/temporal/inspect.rb,
lib/activejob/temporal/payload.rb,
lib/activejob/temporal/version.rb,
lib/activejob/temporal/job_tags.rb,
lib/activejob/temporal/schedule.rb,
lib/activejob/temporal/tls_file.rb,
lib/activejob/temporal/audit_log.rb,
lib/activejob/temporal/middleware.rb,
lib/activejob/temporal/bind_policy.rb,
lib/activejob/temporal/schedulable.rb,
lib/activejob/temporal/worker_pool.rb,
lib/activejob/temporal/configurable.rb,
lib/activejob/temporal/retry_mapper.rb,
lib/activejob/temporal/signal_query.rb,
lib/activejob/temporal/chain_options.rb,
lib/activejob/temporal/configuration.rb,
lib/activejob/temporal/observability.rb,
lib/activejob/temporal/worker_health.rb,
lib/activejob/temporal/batch_enqueuer.rb,
lib/activejob/temporal/job_descriptor.rb,
lib/activejob/temporal/metrics_server.rb,
lib/activejob/temporal/payload_storage.rb,
lib/activejob/temporal/http_line_reader.rb,
lib/activejob/temporal/middleware/chain.rb,
lib/activejob/temporal/schedule_options.rb,
lib/activejob/temporal/temporal_options.rb,
lib/activejob/temporal/visibility_query.rb,
lib/activejob/temporal/dead_letter_queue.rb,
lib/activejob/temporal/search_attributes.rb,
lib/activejob/temporal/workflow_enqueuer.rb,
lib/activejob/temporal/workflow_identity.rb,
lib/activejob/temporal/dependency_options.rb,
lib/activejob/temporal/external_operation.rb,
lib/activejob/temporal/payload_encryption.rb,
lib/activejob/temporal/rate_limit_options.rb,
lib/activejob/temporal/transaction_safety.rb,
lib/activejob/temporal/certificate_watcher.rb,
lib/activejob/temporal/conditional_enqueue.rb,
lib/activejob/temporal/health_check_server.rb,
lib/activejob/temporal/job_payload_builder.rb,
lib/activejob/temporal/payload_serializers.rb,
lib/activejob/temporal/reload_signal_queue.rb,
lib/activejob/temporal/workflow_id_builder.rb,
lib/activejob/temporal/batch_enqueue_result.rb,
lib/activejob/temporal/cancel/batch_summary.rb,
lib/activejob/temporal/rate_limiters/memory.rb,
lib/activejob/temporal/signal_query_options.rb,
lib/activejob/temporal/observability/datadog.rb,
lib/activejob/temporal/workflows/aj_workflow.rb,
lib/activejob/temporal/cancel/batch_canceller.rb,
lib/activejob/temporal/child_workflow_options.rb,
lib/activejob/temporal/connection_worker_pool.rb,
lib/activejob/temporal/worker_client_reloader.rb,
lib/activejob/temporal/job_payload_rate_limits.rb,
lib/activejob/temporal/retry_handler_extractor.rb,
lib/activejob/temporal/workflow_enqueuer_batch.rb,
lib/activejob/temporal/job_payload_dependencies.rb,
lib/activejob/temporal/observability/prometheus.rb,
lib/activejob/temporal/payload_serializers/json.rb,
lib/activejob/temporal/rails_environment_loader.rb,
lib/activejob/temporal/workflows/workflow_nexus.rb,
lib/activejob/temporal/active_job_handler_source.rb,
lib/activejob/temporal/job_payload_chain_builder.rb,
lib/activejob/temporal/job_payload_child_workflows.rb,
lib/activejob/temporal/observability/opentelemetry.rb,
lib/activejob/temporal/payload_serializers/marshal.rb,
lib/activejob/temporal/workflows/workflow_chaining.rb,
lib/activejob/temporal/configured_job_compatibility.rb,
lib/activejob/temporal/activities/aj_runner_activity.rb,
lib/activejob/temporal/workflows/dead_letter_support.rb,
lib/activejob/temporal/workflows/workflow_versioning.rb,
lib/activejob/temporal/activities/rate_limit_activity.rb,
lib/activejob/temporal/dead_letter_payload_validation.rb,
lib/activejob/temporal/workflows/dead_letter_workflow.rb,
lib/activejob/temporal/workflows/workflow_dependencies.rb,
lib/activejob/temporal/workflows/workflow_interactions.rb,
lib/activejob/temporal/payload_serializers/message_pack.rb,
lib/activejob/temporal/job_payload_workflow_interactions.rb,
lib/activejob/temporal/workflows/workflow_child_workflows.rb,
lib/activejob/temporal/workflows/workflow_continue_as_new.rb,
lib/activejob/temporal/workflows/workflow_execution_steps.rb,
lib/activejob/temporal/activities/best_effort_side_effects.rb,
lib/activejob/temporal/workflows/workflow_local_activities.rb,
lib/activejob/temporal/activities/dependency_status_activity.rb
Overview
ActiveJob adapter for Temporal workflow orchestration.
This gem provides a durable, fault-tolerant execution backend for Rails ActiveJob by leveraging Temporal’s workflow engine. Jobs are executed as Temporal workflows with automatic retries, scheduling, and observability.
Defined Under Namespace
Modules: Activities, Adapter, AuditLog, BindPolicy, Cancel, ChainOptions, ChildWorkflowOptions, Client, ConditionalEnqueue, Configurable, ConfiguredConditionalEnqueue, ConfiguredJobCompatibility, DeadLetterPayloadValidation, DeadLetterQueue, DependencyOptions, HttpLineReader, Inspect, JobPayloadChainBuilder, JobPayloadChildWorkflows, JobPayloadDependencies, JobPayloadRateLimits, JobPayloadWorkflowInteractions, JobTags, Logger, Middleware, Observability, Payload, PayloadEncryption, PayloadSerializers, PayloadStorage, RailsEnvironmentLoader, RateLimitOptions, RateLimiters, RetryMapper, Schedulable, SearchAttributes, SignalQuery, SignalQueryOptions, TLSFile, TemporalOptions, TransactionSafety, VisibilityQuery, WorkflowEnqueuerBatch, WorkflowIdentity, Workflows Classes: ActiveJobHandlerSource, BatchEnqueueItemResult, BatchEnqueueResult, BatchEnqueueValidationError, BatchEnqueuer, CertificateWatcher, ConfigValidator, Configuration, ConfigurationError, ConnectionWorkerPool, DuplicateEnqueueError, Error, ExternalOperation, ExternalOperationOptions, HealthCheckServer, JobDescriptor, JobPayloadBuilder, MetricsServer, ReloadSignalQueue, RetryHandlerExtractor, Schedule, ScheduleOptions, TemporalConnectionError, WorkerClientReloader, WorkerHealth, WorkerPool, WorkflowEnqueuer, WorkflowIdBuilder, WorkflowNotFoundError
Constant Summary collapse
- VERSION =
"0.1.0"- LOCALE_PATH =
File.("locales/en.yml", __dir__)
- VALIDATION_LEVELS =
%i[strict warn none].freeze
- PAYLOAD_SERIALIZERS =
PayloadSerializers::SUPPORTED
- LOCAL_ACTIVITY_HELPERS =
%i[rate_limit].freeze
- UNTRAPPABLE_SIGNALS =
%w[CHLD INT KILL PIPE QUIT STOP TERM].freeze
- POSITIONAL_PARAMETER_TYPES =
%i[req opt rest].freeze
- MAX_TARGET_HOST_LENGTH =
253- MAX_NAMESPACE_LENGTH =
1000- TARGET_HOST_LABEL_PATTERN =
/\A[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?\z/- TARGET_PORT_PATTERN =
/\A[1-9]\d{0,4}\z/- NAMESPACE_PATTERN =
/\A[A-Za-z0-9](?:[A-Za-z0-9_-]*[A-Za-z0-9])?\z/- CONFIGURATION_ATTRIBUTES =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Central registry of all configuration attributes.
This is the single source of truth for attribute names, types, defaults, and env vars. Provides metadata for:
-
Automatic attribute accessor generation
-
Default value initialization (with lazy evaluation via Proc)
-
Environment variable mapping (e.g., TEMPORAL_TARGET)
-
Type information for validation and type coercion
-
{ # Connection Settings target: { default: "127.0.0.1:7233", env_var: "ACTIVEJOB_TEMPORAL_TARGET", type: :string, description: "Temporal server host:port" }, namespace: { default: "default", env_var: "ACTIVEJOB_TEMPORAL_NAMESPACE", type: :string, description: "Temporal namespace" }, task_queue_prefix: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_TASK_QUEUE_PREFIX", type: :string, description: "Optional prefix for task queue names" }, task_queue: { default: "default", env_var: "ACTIVEJOB_TEMPORAL_TASK_QUEUE", type: :string, description: "Default task queue name for workers" }, tls: { default: nil, type: :object, description: "Optional SDK-native TLS options or hash-compatible TLS settings" }, tls_cert_path: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_TLS_CERT_PATH", type: :string, description: "Optional client certificate file path for mTLS" }, tls_key_path: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_TLS_KEY_PATH", type: :string, description: "Optional client private key file path for mTLS" }, tls_server_root_ca_cert_path: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_TLS_SERVER_ROOT_CA_CERT_PATH", type: :string, description: "Optional server root CA certificate file path for TLS verification" }, tls_domain: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_TLS_DOMAIN", type: :string, description: "Optional TLS SNI domain override" }, tls_cert_watch: { default: false, env_var: "ACTIVEJOB_TEMPORAL_TLS_CERT_WATCH", type: :boolean, description: "Watch TLS certificate files and reload worker clients when they change" }, tls_reload_signal: { default: "HUP", env_var: "ACTIVEJOB_TEMPORAL_TLS_RELOAD_SIGNAL", type: :string, description: "Signal used by workers to reload TLS certificates manually" }, priority_task_queues: { default: -> { {} }, type: :hash, description: "Optional mapping from numeric ActiveJob priority values to Temporal task queues" }, workflow_id_generator: { default: nil, type: :callable, description: "Optional callable for custom Temporal workflow IDs" }, rate_limiter: { default: nil, type: :object, description: "Optional limiter backend responding to #wait_time_for or #call" }, global_rate_limit: { default: nil, type: :hash, description: "Optional global rate limit hash, for example { limit: 1000, per: :minute }" }, # Timeouts (use Proc for lazy evaluation of ActiveSupport::Duration) default_activity_timeout: { default: -> { 15.minutes }, type: :duration, description: "Default start_to_close_timeout for activity execution" }, default_heartbeat_timeout: { default: nil, type: :duration, description: "Default heartbeat_timeout for activity execution (optional)" }, default_schedule_to_start_timeout: { default: nil, type: :duration, description: "Default schedule_to_start_timeout for activity execution (optional)" }, default_schedule_to_close_timeout: { default: nil, type: :duration, description: "Default schedule_to_close_timeout for activity execution (optional)" }, default_retry_initial_interval: { default: -> { 30.seconds }, type: :duration, description: "Initial retry interval" }, # Retry Settings default_retry_backoff: { default: 2.0, type: :float, description: "Backoff coefficient for exponential retry" }, default_retry_max_attempts: { default: 1, type: :integer, description: "Maximum retry attempts for activities" }, dead_letter_queue: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_DEAD_LETTER_QUEUE", type: :string, description: "Optional Temporal task queue for failed job dead letter workflows" }, dead_letter_after_attempts: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_DEAD_LETTER_AFTER_ATTEMPTS", type: :integer, description: "Optional retry attempt limit before routing jobs to the dead letter queue" }, dead_letter_auto_discard_after: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_DEAD_LETTER_AUTO_DISCARD_AFTER_SECONDS", type: :duration, description: "Optional time to keep dead letter workflows queryable before auto-discarding them" }, # Observability logger: { default: lambda { (defined?(Rails) && Rails.respond_to?(:logger) && Rails.logger) || ::Logger.new($stdout) }, type: :object, description: "Logger instance for gem output" }, audit_log: { default: false, env_var: "ACTIVEJOB_TEMPORAL_AUDIT_LOG", type: :boolean, description: "Enable structured audit events for job lifecycle changes" }, audit_logger: { default: nil, type: :object, description: "Optional logger instance for audit events; falls back to logger" }, validation_level: { default: :strict, type: :symbol, description: "Configuration validation behavior: :strict, :warn, or :none" }, observability: { default: -> { Observability::Configuration.new }, type: :object, description: "Optional observability adapters and trace propagation settings" }, middleware_chain: { default: -> { Middleware::Chain.new }, type: :object, description: "Ordered middleware chain for activity job execution" }, identity: { default: nil, type: :string, description: "Optional worker identity for observability" }, # Payload & Performance max_payload_size_kb: { default: 250, env_var: "ACTIVEJOB_TEMPORAL_MAX_PAYLOAD_SIZE_KB", type: :integer, description: "Maximum job payload size in kilobytes" }, payload_serializer: { default: :json, env_var: "ACTIVEJOB_TEMPORAL_PAYLOAD_SERIALIZER", type: :symbol, description: "Payload serializer for job execution data: :json, :message_pack, :msgpack, or :marshal" }, payload_storage_adapter: { default: nil, type: :object, description: "Optional external payload storage adapter responding to #dump and #load" }, payload_storage_threshold_kb: { default: nil, type: :integer, description: "Optional payload size threshold in kilobytes for external payload storage" }, encrypt_payload: { default: false, env_var: "ACTIVEJOB_TEMPORAL_ENCRYPT_PAYLOAD", type: :boolean, description: "Encrypt serialized job execution payloads before sending them to Temporal" }, encryption_key: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_ENCRYPTION_KEY", type: :object, description: "Base64-encoded 32-byte AES-256-GCM payload encryption key or key metadata" }, encryption_old_keys: { default: -> { [] }, type: :array, description: "Previous payload encryption keys or key metadata accepted for decryption" }, enable_search_attributes: { default: true, type: :boolean, description: "Enable Temporal search attributes for job metadata" }, continue_as_new_history_event_threshold: { default: nil, env_var: "ACTIVEJOB_TEMPORAL_CONTINUE_AS_NEW_HISTORY_EVENT_THRESHOLD", type: :integer, description: "Optional workflow history event threshold for continuing ActiveJob workflows as new" }, local_activity_helpers: { default: -> { [] }, type: :array, description: "Internal helper activity names that should run as Temporal local activities" }, max_concurrent_activities: { default: 100, env_var: "ACTIVEJOB_TEMPORAL_MAX_CONCURRENT_ACTIVITIES", type: :integer, description: "Maximum concurrent activities per worker" }, max_concurrent_workflow_tasks: { default: 5, env_var: "ACTIVEJOB_TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASKS", type: :integer, description: "Maximum concurrent workflow tasks per worker" } }.freeze
Class Method Summary collapse
- .activity(temporal_type) ⇒ Object
-
.cancel(job_class, job_id) ⇒ Boolean?
Cancels a running or scheduled job by job ID.
- .cancel_all(job_class) ⇒ Object
- .cancel_where(filters) ⇒ Object
-
.client ⇒ Temporalio::Client
Returns the memoized Temporal client connection for the process.
- .completed?(job_class, job_id) ⇒ Boolean
- .dead_letter_entries(queue: nil, limit: DeadLetterQueue::DEFAULT_ENTRIES_LIMIT) ⇒ Object
- .dead_letter_entry(job_class, job_id) ⇒ Object
- .discard_dead_letter(job_class, job_id, reason: nil) ⇒ Object
- .enqueue_batch(items, concurrency: 1) ⇒ Object
- .failed?(job_class, job_id) ⇒ Boolean
- .job(job_class, **options) ⇒ Object
- .query(job_class, job_id, query_name, reject_condition: SignalQuery::DEFAULT_REJECT_CONDITION) ⇒ Object
- .reload_client! ⇒ Object
- .retry_dead_letter(job_class, job_id, queue: nil) ⇒ Object
- .running?(job_class, job_id) ⇒ Boolean
- .signal(job_class, job_id, signal_name) ⇒ Object
- .status(job_class, job_id) ⇒ Object
- .update(job_class, job_id, update_name) ⇒ Object
- .workflow(temporal_type) ⇒ Object
Methods included from Configurable
Class Method Details
.activity(temporal_type) ⇒ Object
226 227 228 |
# File 'lib/activejob/temporal.rb', line 226 def activity(temporal_type, **) ExternalOperation.activity(temporal_type, **) end |
.cancel(job_class, job_id) ⇒ Boolean?
Cancellation Requires Heartbeating For jobs to respond to cancellation, they must check for cancellation by heartbeating or polling Temporalio::Activity::Context.current.cancelled?. Without heartbeating, long-running activities will complete before they detect the cancellation signal.
Cancels a running or scheduled job by job ID.
This method requests cancellation for the Temporal workflow associated with the job. Cancellation is asynchronous and best-effort: the job will stop only if it is actively heartbeating. See Cancel module documentation for details.
206 207 208 |
# File 'lib/activejob/temporal.rb', line 206 def cancel(job_class, job_id) Cancel.cancel(job_class, job_id) end |
.cancel_all(job_class) ⇒ Object
210 211 212 |
# File 'lib/activejob/temporal.rb', line 210 def cancel_all(job_class) Cancel.cancel_all(job_class) end |
.cancel_where(filters) ⇒ Object
214 215 216 |
# File 'lib/activejob/temporal.rb', line 214 def cancel_where(filters) Cancel.cancel_where(filters) end |
.client ⇒ Temporalio::Client
Returns the memoized Temporal client connection for the process.
The client is connected to the Temporal server specified in the configuration. TLS options can be provided via configuration attributes or environment variables:
-
TEMPORAL_TLS_CERT: TLS certificate
-
TEMPORAL_TLS_KEY: TLS private key
-
TEMPORAL_TLS_SERVER_NAME: TLS server name
145 146 147 148 149 |
# File 'lib/activejob/temporal.rb', line 145 def client client_mutex.synchronize do @client ||= Client.build(config) end end |
.completed?(job_class, job_id) ⇒ Boolean
258 259 260 |
# File 'lib/activejob/temporal.rb', line 258 def completed?(job_class, job_id) Inspect.completed?(job_class, job_id) end |
.dead_letter_entries(queue: nil, limit: DeadLetterQueue::DEFAULT_ENTRIES_LIMIT) ⇒ Object
270 271 272 |
# File 'lib/activejob/temporal.rb', line 270 def dead_letter_entries(queue: nil, limit: DeadLetterQueue::DEFAULT_ENTRIES_LIMIT) DeadLetterQueue.entries(queue: queue, limit: limit) end |
.dead_letter_entry(job_class, job_id) ⇒ Object
266 267 268 |
# File 'lib/activejob/temporal.rb', line 266 def dead_letter_entry(job_class, job_id) DeadLetterQueue.entry(job_class, job_id) end |
.discard_dead_letter(job_class, job_id, reason: nil) ⇒ Object
278 279 280 |
# File 'lib/activejob/temporal.rb', line 278 def discard_dead_letter(job_class, job_id, reason: nil) DeadLetterQueue.discard(job_class, job_id, reason: reason) end |
.enqueue_batch(items, concurrency: 1) ⇒ Object
218 219 220 221 222 223 224 |
# File 'lib/activejob/temporal.rb', line 218 def enqueue_batch(items, concurrency: 1) WorkflowEnqueuer.new( -> { client }, config, config.logger ).enqueue_batch(items, concurrency: concurrency) end |
.failed?(job_class, job_id) ⇒ Boolean
262 263 264 |
# File 'lib/activejob/temporal.rb', line 262 def failed?(job_class, job_id) Inspect.failed?(job_class, job_id) end |
.job(job_class, **options) ⇒ Object
234 235 236 |
# File 'lib/activejob/temporal.rb', line 234 def job(job_class, **) JobDescriptor.new(job_class, ) end |
.query(job_class, job_id, query_name, reject_condition: SignalQuery::DEFAULT_REJECT_CONDITION) ⇒ Object
246 247 248 |
# File 'lib/activejob/temporal.rb', line 246 def query(job_class, job_id, query_name, *, reject_condition: SignalQuery::DEFAULT_REJECT_CONDITION) SignalQuery.query(job_class, job_id, query_name, *, reject_condition: reject_condition) end |
.reload_client! ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/activejob/temporal.rb', line 151 def reload_client! fresh_client = Client.build(config) begin yield fresh_client if block_given? rescue StandardError close_client(fresh_client) raise end previous_client = nil client_mutex.synchronize do previous_client = @client @client = fresh_client end close_client(previous_client) unless previous_client.equal?(fresh_client) fresh_client end |
.retry_dead_letter(job_class, job_id, queue: nil) ⇒ Object
274 275 276 |
# File 'lib/activejob/temporal.rb', line 274 def retry_dead_letter(job_class, job_id, queue: nil) DeadLetterQueue.retry(job_class, job_id, queue: queue) end |
.running?(job_class, job_id) ⇒ Boolean
254 255 256 |
# File 'lib/activejob/temporal.rb', line 254 def running?(job_class, job_id) Inspect.running?(job_class, job_id) end |
.signal(job_class, job_id, signal_name) ⇒ Object
242 243 244 |
# File 'lib/activejob/temporal.rb', line 242 def signal(job_class, job_id, signal_name, *) SignalQuery.signal(job_class, job_id, signal_name, *) end |
.status(job_class, job_id) ⇒ Object
238 239 240 |
# File 'lib/activejob/temporal.rb', line 238 def status(job_class, job_id) Inspect.status(job_class, job_id) end |
.update(job_class, job_id, update_name) ⇒ Object
250 251 252 |
# File 'lib/activejob/temporal.rb', line 250 def update(job_class, job_id, update_name, *) SignalQuery.update(job_class, job_id, update_name, *) end |
.workflow(temporal_type) ⇒ Object
230 231 232 |
# File 'lib/activejob/temporal.rb', line 230 def workflow(temporal_type, **) ExternalOperation.workflow(temporal_type, **) end |