Class: ActiveJob::Temporal::WorkflowEnqueuer

Inherits:
Object
  • Object
show all
Includes:
WorkflowEnqueuerBatch
Defined in:
lib/activejob/temporal/workflow_enqueuer.rb

Overview

Service object for enqueueing jobs as Temporal workflows.

This class handles the mechanics of converting an ActiveJob into a Temporal workflow execution, including payload serialization, workflow ID generation, and options building.

rubocop:disable Metrics/ClassLength

Examples:

Using with a job

enqueuer = WorkflowEnqueuer.new(client, config)
workflow_id = enqueuer.enqueue(job, scheduled_at: 5.minutes.from_now)

Direct usage

client = ActiveJob::Temporal.client
config = ActiveJob::Temporal.config
enqueuer = WorkflowEnqueuer.new(client, config)
enqueuer.enqueue(job)

Instance Method Summary collapse

Methods included from WorkflowEnqueuerBatch

#enqueue_batch

Constructor Details

#initialize(client, config, logger = nil, workflow_id_builder: nil, payload_builder: nil) ⇒ WorkflowEnqueuer

Returns a new instance of WorkflowEnqueuer.

Parameters:

  • client (Temporalio::Client)

    Temporal client connection

  • config (ActiveJob::Temporal::Configuration)

    Configuration object

  • logger (Logger) (defaults to: nil)

    Optional logger instance

  • workflow_id_builder (WorkflowIdBuilder) (defaults to: nil)

    Builder for Temporal workflow IDs



35
36
37
38
39
40
41
42
# File 'lib/activejob/temporal/workflow_enqueuer.rb', line 35

def initialize(client, config, logger = nil, workflow_id_builder: nil, payload_builder: nil)
  @client_provider = client if client.respond_to?(:call)
  @client = client unless @client_provider
  @config = config
  @logger = logger || config.logger
  @workflow_id_builder = workflow_id_builder || WorkflowIdBuilder.new(configured_workflow_id_generator)
  @payload_builder = payload_builder || JobPayloadBuilder.new(config)
end

Instance Method Details

#enqueue(job, scheduled_at: nil) ⇒ Object

Enqueue a job as a Temporal workflow.

Performs validation, builds the payload, generates a workflow ID, constructs workflow options, and starts the workflow via the Temporal client.

Examples:

Immediate execution

enqueuer.enqueue(job) # => workflow handle

Scheduled execution

enqueuer.enqueue(job, scheduled_at: 1.hour.from_now) # => workflow handle

Duplicate job (FAIL conflict policy)

enqueuer.enqueue(job) # => handle
enqueuer.enqueue(job) # raises DuplicateEnqueueError

Parameters:

  • job (ActiveJob::Base)

    The job to enqueue

  • scheduled_at (Time, nil) (defaults to: nil)

    Time to schedule job, nil for immediate execution

Returns:

  • (Object)

    Workflow run handle

Raises:

  • (ActiveJob::SerializationError)

    If payload serialization fails or exceeds max size

  • (ActiveJob::EnqueueError)

    If workflow cannot be started

  • (ActiveJob::Temporal::ConfigurationError)

    If job configuration is invalid



66
67
68
69
70
71
72
# File 'lib/activejob/temporal/workflow_enqueuer.rb', line 66

def enqueue(job, scheduled_at: nil)
  validate_job_for_enqueueing(job)
  scheduled_at = validate_scheduled_at!(scheduled_at)
  workflow_id = @workflow_id_builder.build(job)
  payload = build_payload(job, workflow_id: workflow_id, scheduled_at: scheduled_at)
  enqueue_with_payload(job, payload, workflow_id)
end