Class: Conductor::Workflow::WorkflowExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/workflow/workflow_executor.rb

Overview

WorkflowExecutor provides a high-level interface for executing workflows Supports both synchronous (wait for completion) and asynchronous execution

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = nil, event_dispatcher: nil, logger: nil) ⇒ WorkflowExecutor

Initialize WorkflowExecutor

Parameters:



25
26
27
28
29
30
31
32
33
# File 'lib/conductor/workflow/workflow_executor.rb', line 25

def initialize(configuration = nil, event_dispatcher: nil, logger: nil)
  @configuration = configuration || Configuration.new
  @logger = logger || Logger.new(File::NULL)
  api_client = Http::ApiClient.new(configuration: @configuration)
  @workflow_api = Http::Api::WorkflowResourceApi.new(api_client)
  @metadata_api = Http::Api::MetadataResourceApi.new(api_client)
  @task_api = Http::Api::TaskResourceApi.new(api_client)
  @event_dispatcher = event_dispatcher || Worker::Events::SyncEventDispatcher.new
end

Instance Attribute Details

#event_dispatcherObject (readonly)

Returns the value of attribute event_dispatcher.



20
21
22
# File 'lib/conductor/workflow/workflow_executor.rb', line 20

def event_dispatcher
  @event_dispatcher
end

#metadata_apiObject (readonly)

Returns the value of attribute metadata_api.



20
21
22
# File 'lib/conductor/workflow/workflow_executor.rb', line 20

def 
  @metadata_api
end

#task_apiObject (readonly)

Returns the value of attribute task_api.



20
21
22
# File 'lib/conductor/workflow/workflow_executor.rb', line 20

def task_api
  @task_api
end

#workflow_apiObject (readonly)

Returns the value of attribute workflow_api.



20
21
22
# File 'lib/conductor/workflow/workflow_executor.rb', line 20

def workflow_api
  @workflow_api
end

Instance Method Details

#execute(name, input: {}, version: nil, correlation_id: nil, domain: nil, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) ⇒ WorkflowRun

Execute a workflow by name with input (convenience method)

Parameters:

  • name (String)

    Workflow name

  • input (Hash) (defaults to: {})

    Workflow input (default: {})

  • version (Integer) (defaults to: nil)

    Workflow version (optional)

  • correlation_id (String) (defaults to: nil)

    Correlation ID (optional)

  • domain (String) (defaults to: nil)

    Task domain for all tasks (optional)

  • wait_until_task_ref (String) (defaults to: nil)

    Wait until this task completes (optional)

  • wait_for_seconds (Integer) (defaults to: 10)

    Maximum time to wait (default: 10)

  • request_id (String) (defaults to: nil)

    Unique request ID (optional)

Returns:

  • (WorkflowRun)

    Workflow run result



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/conductor/workflow/workflow_executor.rb', line 99

def execute(name, input: {}, version: nil, correlation_id: nil, domain: nil,
            wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil)
  request = Http::Models::StartWorkflowRequest.new(
    name: name,
    input: input,
    version: version,
    correlation_id: correlation_id
  )
  request.task_to_domain = { '*' => domain } if domain

  execute_workflow(
    request,
    wait_until_task_ref: wait_until_task_ref,
    wait_for_seconds: wait_for_seconds,
    request_id: request_id
  )
end

#execute_and_wait(name, input: {}, timeout_seconds: 60, **options) ⇒ Workflow

Execute a workflow and wait for completion (with polling fallback)

Parameters:

  • name (String)

    Workflow name

  • input (Hash) (defaults to: {})

    Workflow input

  • timeout_seconds (Integer) (defaults to: 60)

    Maximum time to wait

  • options (Hash)

    Additional options (version, correlation_id, etc.)

Returns:



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/conductor/workflow/workflow_executor.rb', line 319

def execute_and_wait(name, input: {}, timeout_seconds: 60, **options)
  # First try synchronous execution
  result = execute(
    name,
    input: input,
    wait_for_seconds: [timeout_seconds, 30].min, # Server-side wait capped at 30s typically
    **options
  )

  # If still running, poll for completion
  if result.running?
    wait_for_workflow(result.workflow_id, timeout_seconds: timeout_seconds)
  else
    get_workflow(result.workflow_id)
  end
end

#execute_workflow(request, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) ⇒ WorkflowRun

Execute a workflow synchronously and wait for completion

Parameters:

  • request (StartWorkflowRequest)

    Start workflow request

  • wait_until_task_ref (String) (defaults to: nil)

    Wait until this task completes (optional)

  • wait_for_seconds (Integer) (defaults to: 10)

    Maximum time to wait (default: 10)

  • request_id (String) (defaults to: nil)

    Unique request ID for idempotency (auto-generated if not provided)

Returns:

  • (WorkflowRun)

    Workflow run result



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/conductor/workflow/workflow_executor.rb', line 76

def execute_workflow(request, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil)
  request_id ||= SecureRandom.uuid

  @workflow_api.execute_workflow(
    request,
    name: request.name,
    version: request.version || 1,
    request_id: request_id,
    wait_until_task_ref: wait_until_task_ref,
    wait_for_seconds: wait_for_seconds
  )
end

#get_by_correlation_id(workflow_name, correlation_id, include_closed: false, include_tasks: false) ⇒ Array<Workflow>

Get workflows by correlation ID

Parameters:

  • workflow_name (String)

    Workflow name

  • correlation_id (String)

    Correlation ID

  • include_closed (Boolean) (defaults to: false)

    Include closed workflows (default: false)

  • include_tasks (Boolean) (defaults to: false)

    Include task details (default: false)

Returns:

  • (Array<Workflow>)

    List of workflows



261
262
263
264
265
266
267
268
# File 'lib/conductor/workflow/workflow_executor.rb', line 261

def get_by_correlation_id(workflow_name, correlation_id, include_closed: false, include_tasks: false)
  @workflow_api.get_workflows(
    workflow_name,
    correlation_id,
    include_closed: include_closed,
    include_tasks: include_tasks
  )
end

#get_by_correlation_ids(workflow_name, correlation_ids, include_closed: false, include_tasks: false) ⇒ Hash<String, Array<Workflow>>

Get workflows by multiple correlation IDs

Parameters:

  • workflow_name (String)

    Workflow name

  • correlation_ids (Array<String>)

    List of correlation IDs

  • include_closed (Boolean) (defaults to: false)

    Include closed workflows (default: false)

  • include_tasks (Boolean) (defaults to: false)

    Include task details (default: false)

Returns:

  • (Hash<String, Array<Workflow>>)

    Map of correlation ID to workflows



276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/conductor/workflow/workflow_executor.rb', line 276

def get_by_correlation_ids(workflow_name, correlation_ids, include_closed: false, include_tasks: false)
  # NOTE: This would require a batch API endpoint; for now, iterate
  result = {}
  correlation_ids.each do |correlation_id|
    result[correlation_id] = get_by_correlation_id(
      workflow_name,
      correlation_id,
      include_closed: include_closed,
      include_tasks: include_tasks
    )
  end
  result
end

#get_task(task_id) ⇒ Task

Get a task by ID

Parameters:

  • task_id (String)

    Task ID

Returns:

  • (Task)

    Task object



247
248
249
# File 'lib/conductor/workflow/workflow_executor.rb', line 247

def get_task(task_id)
  @task_api.get_task(task_id)
end

#get_workflow(workflow_id, include_tasks: true) ⇒ Workflow

Get workflow execution details

Parameters:

  • workflow_id (String)

    Workflow ID

  • include_tasks (Boolean) (defaults to: true)

    Include task details (default: true)

Returns:



125
126
127
# File 'lib/conductor/workflow/workflow_executor.rb', line 125

def get_workflow(workflow_id, include_tasks: true)
  @workflow_api.get_execution_status(workflow_id, include_tasks: include_tasks)
end

#get_workflow_status(workflow_id, include_output: false, include_variables: false) ⇒ Hash

Get workflow status (lightweight)

Parameters:

  • workflow_id (String)

    Workflow ID

  • include_output (Boolean) (defaults to: false)

    Include workflow output (default: false)

  • include_variables (Boolean) (defaults to: false)

    Include workflow variables (default: false)

Returns:

  • (Hash)

    Workflow status



134
135
136
137
138
139
140
# File 'lib/conductor/workflow/workflow_executor.rb', line 134

def get_workflow_status(workflow_id, include_output: false, include_variables: false)
  @workflow_api.get_workflow_status(
    workflow_id,
    include_output: include_output,
    include_variables: include_variables
  )
end

#pause(workflow_id) ⇒ void

This method returns an undefined value.

Pause a running workflow

Parameters:

  • workflow_id (String)

    Workflow ID



149
150
151
# File 'lib/conductor/workflow/workflow_executor.rb', line 149

def pause(workflow_id)
  @workflow_api.pause_workflow(workflow_id)
end

#register_workflow(workflow, overwrite: true) ⇒ Object

Register a workflow definition

Parameters:

  • workflow (WorkflowDef, ConductorWorkflow)

    Workflow definition or ConductorWorkflow DSL

  • overwrite (Boolean) (defaults to: true)

    Overwrite existing definition (default: true)

Returns:

  • (Object)

    Response



43
44
45
46
# File 'lib/conductor/workflow/workflow_executor.rb', line 43

def register_workflow(workflow, overwrite: true)
  workflow_def = workflow.respond_to?(:to_workflow_def) ? workflow.to_workflow_def : workflow
  @metadata_api.update_workflows([workflow_def], overwrite: overwrite)
end

#remove_workflow(workflow_id, archive_workflow: true) ⇒ void

This method returns an undefined value.

Remove (delete) a workflow permanently

Parameters:

  • workflow_id (String)

    Workflow ID

  • archive_workflow (Boolean) (defaults to: true)

    Archive before deleting (default: true)



196
197
198
# File 'lib/conductor/workflow/workflow_executor.rb', line 196

def remove_workflow(workflow_id, archive_workflow: true)
  @workflow_api.delete(workflow_id, archive_workflow: archive_workflow)
end

#rerun(workflow_id, rerun_request) ⇒ String

Rerun a workflow from a specific task

Parameters:

  • workflow_id (String)

    Workflow ID

  • rerun_request (Hash, RerunWorkflowRequest)

    Rerun configuration

Returns:

  • (String)

    New workflow ID



188
189
190
# File 'lib/conductor/workflow/workflow_executor.rb', line 188

def rerun(workflow_id, rerun_request)
  @workflow_api.rerun(workflow_id, rerun_request)
end

#restart(workflow_id, use_latest_definitions: false) ⇒ void

This method returns an undefined value.

Restart a completed workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • use_latest_definitions (Boolean) (defaults to: false)

    Use latest workflow definition (default: false)



172
173
174
# File 'lib/conductor/workflow/workflow_executor.rb', line 172

def restart(workflow_id, use_latest_definitions: false)
  @workflow_api.restart(workflow_id, use_latest_def: use_latest_definitions)
end

#resume(workflow_id) ⇒ void

This method returns an undefined value.

Resume a paused workflow

Parameters:

  • workflow_id (String)

    Workflow ID



156
157
158
# File 'lib/conductor/workflow/workflow_executor.rb', line 156

def resume(workflow_id)
  @workflow_api.resume_workflow(workflow_id)
end

#retry(workflow_id, resume_subworkflow_tasks: false) ⇒ void

This method returns an undefined value.

Retry a failed workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • resume_subworkflow_tasks (Boolean) (defaults to: false)

    Resume subworkflow tasks (default: false)



180
181
182
# File 'lib/conductor/workflow/workflow_executor.rb', line 180

def retry(workflow_id, resume_subworkflow_tasks: false)
  @workflow_api.retry(workflow_id, resume_subworkflow_tasks: resume_subworkflow_tasks)
end

#skip_task_from_workflow(workflow_id, task_reference_name, request: nil) ⇒ void

This method returns an undefined value.

Skip a task in a running workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_reference_name (String)

    Task reference name to skip

  • request (Hash) (defaults to: nil)

    Skip task request (optional)



205
206
207
# File 'lib/conductor/workflow/workflow_executor.rb', line 205

def skip_task_from_workflow(workflow_id, task_reference_name, request: nil)
  @workflow_api.skip_task_from_workflow(workflow_id, task_reference_name, request: request)
end

#start_workflow(request) ⇒ String

Start a workflow asynchronously (returns immediately with workflow ID)

Parameters:

  • request (StartWorkflowRequest)

    Start workflow request

Returns:

  • (String)

    Workflow ID



55
56
57
58
59
60
61
# File 'lib/conductor/workflow/workflow_executor.rb', line 55

def start_workflow(request)
  publish_workflow_input_size(request)
  @workflow_api.start_workflow(request)
rescue StandardError => e
  publish_workflow_start_error(request, e)
  raise
end

#start_workflows(*requests) ⇒ Array<String>

Start multiple workflows

Parameters:

  • requests (Array<StartWorkflowRequest>)

    List of start workflow requests

Returns:

  • (Array<String>)

    List of workflow IDs



66
67
68
# File 'lib/conductor/workflow/workflow_executor.rb', line 66

def start_workflows(*requests)
  requests.flatten.map { |request| start_workflow(request) }
end

#terminate(workflow_id, reason: nil) ⇒ void

This method returns an undefined value.

Terminate a running workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • reason (String) (defaults to: nil)

    Termination reason (optional)



164
165
166
# File 'lib/conductor/workflow/workflow_executor.rb', line 164

def terminate(workflow_id, reason: nil)
  @workflow_api.terminate(workflow_id, reason: reason)
end

#update_task(workflow_id, task_id, task_output, status) ⇒ String

Update a task result

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_id (String)

    Task ID

  • task_output (Hash)

    Task output data

  • status (String)

    Task status (COMPLETED, FAILED, etc.)

Returns:

  • (String)

    Task update result



219
220
221
222
223
224
225
226
227
# File 'lib/conductor/workflow/workflow_executor.rb', line 219

def update_task(workflow_id, task_id, task_output, status)
  task_result = Http::Models::TaskResult.new(
    workflow_instance_id: workflow_id,
    task_id: task_id,
    output_data: task_output,
    status: status
  )
  @task_api.update_task(task_result)
end

#update_task_by_ref_name(workflow_id, task_reference_name, task_output, status) ⇒ String

Update a task by reference name

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_reference_name (String)

    Task reference name

  • task_output (Hash)

    Task output data

  • status (String)

    Task status

Returns:

  • (String)

    Task update result



235
236
237
238
239
240
241
242
# File 'lib/conductor/workflow/workflow_executor.rb', line 235

def update_task_by_ref_name(workflow_id, task_reference_name, task_output, status)
  @task_api.update_task_by_ref_name(
    task_output,
    workflow_id: workflow_id,
    task_ref_name: task_reference_name,
    status: status
  )
end

#wait_for_workflow(workflow_id, timeout_seconds: 60, poll_interval_seconds: 1.0) ⇒ Workflow

Wait for a workflow to complete

Parameters:

  • workflow_id (String)

    Workflow ID

  • timeout_seconds (Integer) (defaults to: 60)

    Maximum time to wait (default: 60)

  • poll_interval_seconds (Float) (defaults to: 1.0)

    Polling interval (default: 1.0)

Returns:

Raises:

  • (Timeout::Error)

    If workflow doesn't complete within timeout



300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/conductor/workflow/workflow_executor.rb', line 300

def wait_for_workflow(workflow_id, timeout_seconds: 60, poll_interval_seconds: 1.0)
  deadline = Time.now + timeout_seconds

  loop do
    workflow = get_workflow(workflow_id, include_tasks: false)
    return workflow if workflow.terminal?

    raise Timeout::Error, "Workflow #{workflow_id} did not complete within #{timeout_seconds} seconds" if Time.now >= deadline

    sleep(poll_interval_seconds)
  end
end