Class: Conductor::Workflow::WorkflowExecutor
- Inherits:
-
Object
- Object
- Conductor::Workflow::WorkflowExecutor
- Defined in:
- lib/conductor/workflow/workflow_executor.rb
Overview
WorkflowExecutor provides a high-level interface for executing workflows Supports both synchronous (wait for completion) and asynchronous execution
Instance Attribute Summary collapse
-
#event_dispatcher ⇒ Object
readonly
Returns the value of attribute event_dispatcher.
-
#metadata_api ⇒ Object
readonly
Returns the value of attribute metadata_api.
-
#task_api ⇒ Object
readonly
Returns the value of attribute task_api.
-
#workflow_api ⇒ Object
readonly
Returns the value of attribute workflow_api.
Instance Method Summary collapse
-
#execute(name, input: {}, version: nil, correlation_id: nil, domain: nil, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) ⇒ WorkflowRun
Execute a workflow by name with input (convenience method).
-
#execute_and_wait(name, input: {}, timeout_seconds: 60, **options) ⇒ Workflow
Execute a workflow and wait for completion (with polling fallback).
-
#execute_workflow(request, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) ⇒ WorkflowRun
Execute a workflow synchronously and wait for completion.
-
#get_by_correlation_id(workflow_name, correlation_id, include_closed: false, include_tasks: false) ⇒ Array<Workflow>
Get workflows by correlation ID.
-
#get_by_correlation_ids(workflow_name, correlation_ids, include_closed: false, include_tasks: false) ⇒ Hash<String, Array<Workflow>>
Get workflows by multiple correlation IDs.
-
#get_task(task_id) ⇒ Task
Get a task by ID.
-
#get_workflow(workflow_id, include_tasks: true) ⇒ Workflow
Get workflow execution details.
-
#get_workflow_status(workflow_id, include_output: false, include_variables: false) ⇒ Hash
Get workflow status (lightweight).
-
#initialize(configuration = nil, event_dispatcher: nil, logger: nil) ⇒ WorkflowExecutor
constructor
Initialize WorkflowExecutor.
-
#pause(workflow_id) ⇒ void
Pause a running workflow.
-
#register_workflow(workflow, overwrite: true) ⇒ Object
Register a workflow definition.
-
#remove_workflow(workflow_id, archive_workflow: true) ⇒ void
Remove (delete) a workflow permanently.
-
#rerun(workflow_id, rerun_request) ⇒ String
Rerun a workflow from a specific task.
-
#restart(workflow_id, use_latest_definitions: false) ⇒ void
Restart a completed workflow.
-
#resume(workflow_id) ⇒ void
Resume a paused workflow.
-
#retry(workflow_id, resume_subworkflow_tasks: false) ⇒ void
Retry a failed workflow.
-
#skip_task_from_workflow(workflow_id, task_reference_name, request: nil) ⇒ void
Skip a task in a running workflow.
-
#start_workflow(request) ⇒ String
Start a workflow asynchronously (returns immediately with workflow ID).
-
#start_workflows(*requests) ⇒ Array<String>
Start multiple workflows.
-
#terminate(workflow_id, reason: nil) ⇒ void
Terminate a running workflow.
-
#update_task(workflow_id, task_id, task_output, status) ⇒ String
Update a task result.
-
#update_task_by_ref_name(workflow_id, task_reference_name, task_output, status) ⇒ String
Update a task by reference name.
-
#wait_for_workflow(workflow_id, timeout_seconds: 60, poll_interval_seconds: 1.0) ⇒ Workflow
Wait for a workflow to complete.
Constructor Details
#initialize(configuration = nil, event_dispatcher: nil, logger: nil) ⇒ WorkflowExecutor
Initialize WorkflowExecutor
25 26 27 28 29 30 31 32 33 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 25 def initialize(configuration = nil, event_dispatcher: nil, logger: nil) @configuration = configuration || Configuration.new @logger = logger || Logger.new(File::NULL) api_client = Http::ApiClient.new(configuration: @configuration) @workflow_api = Http::Api::WorkflowResourceApi.new(api_client) @metadata_api = Http::Api::MetadataResourceApi.new(api_client) @task_api = Http::Api::TaskResourceApi.new(api_client) @event_dispatcher = event_dispatcher || Worker::Events::SyncEventDispatcher.new end |
Instance Attribute Details
#event_dispatcher ⇒ Object (readonly)
Returns the value of attribute event_dispatcher.
20 21 22 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 20 def event_dispatcher @event_dispatcher end |
#metadata_api ⇒ Object (readonly)
Returns the value of attribute metadata_api.
20 21 22 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 20 def @metadata_api end |
#task_api ⇒ Object (readonly)
Returns the value of attribute task_api.
20 21 22 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 20 def task_api @task_api end |
#workflow_api ⇒ Object (readonly)
Returns the value of attribute workflow_api.
20 21 22 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 20 def workflow_api @workflow_api end |
Instance Method Details
#execute(name, input: {}, version: nil, correlation_id: nil, domain: nil, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) ⇒ WorkflowRun
Execute a workflow by name with input (convenience method)
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 99 def execute(name, input: {}, version: nil, correlation_id: nil, domain: nil, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) request = Http::Models::StartWorkflowRequest.new( name: name, input: input, version: version, correlation_id: correlation_id ) request.task_to_domain = { '*' => domain } if domain execute_workflow( request, wait_until_task_ref: wait_until_task_ref, wait_for_seconds: wait_for_seconds, request_id: request_id ) end |
#execute_and_wait(name, input: {}, timeout_seconds: 60, **options) ⇒ Workflow
Execute a workflow and wait for completion (with polling fallback)
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 319 def execute_and_wait(name, input: {}, timeout_seconds: 60, **) # First try synchronous execution result = execute( name, input: input, wait_for_seconds: [timeout_seconds, 30].min, # Server-side wait capped at 30s typically ** ) # If still running, poll for completion if result.running? wait_for_workflow(result.workflow_id, timeout_seconds: timeout_seconds) else get_workflow(result.workflow_id) end end |
#execute_workflow(request, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) ⇒ WorkflowRun
Execute a workflow synchronously and wait for completion
76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 76 def execute_workflow(request, wait_until_task_ref: nil, wait_for_seconds: 10, request_id: nil) request_id ||= SecureRandom.uuid @workflow_api.execute_workflow( request, name: request.name, version: request.version || 1, request_id: request_id, wait_until_task_ref: wait_until_task_ref, wait_for_seconds: wait_for_seconds ) end |
#get_by_correlation_id(workflow_name, correlation_id, include_closed: false, include_tasks: false) ⇒ Array<Workflow>
Get workflows by correlation ID
261 262 263 264 265 266 267 268 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 261 def get_by_correlation_id(workflow_name, correlation_id, include_closed: false, include_tasks: false) @workflow_api.get_workflows( workflow_name, correlation_id, include_closed: include_closed, include_tasks: include_tasks ) end |
#get_by_correlation_ids(workflow_name, correlation_ids, include_closed: false, include_tasks: false) ⇒ Hash<String, Array<Workflow>>
Get workflows by multiple correlation IDs
276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 276 def get_by_correlation_ids(workflow_name, correlation_ids, include_closed: false, include_tasks: false) # NOTE: This would require a batch API endpoint; for now, iterate result = {} correlation_ids.each do |correlation_id| result[correlation_id] = get_by_correlation_id( workflow_name, correlation_id, include_closed: include_closed, include_tasks: include_tasks ) end result end |
#get_task(task_id) ⇒ Task
Get a task by ID
247 248 249 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 247 def get_task(task_id) @task_api.get_task(task_id) end |
#get_workflow(workflow_id, include_tasks: true) ⇒ Workflow
Get workflow execution details
125 126 127 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 125 def get_workflow(workflow_id, include_tasks: true) @workflow_api.get_execution_status(workflow_id, include_tasks: include_tasks) end |
#get_workflow_status(workflow_id, include_output: false, include_variables: false) ⇒ Hash
Get workflow status (lightweight)
134 135 136 137 138 139 140 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 134 def get_workflow_status(workflow_id, include_output: false, include_variables: false) @workflow_api.get_workflow_status( workflow_id, include_output: include_output, include_variables: include_variables ) end |
#pause(workflow_id) ⇒ void
This method returns an undefined value.
Pause a running workflow
149 150 151 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 149 def pause(workflow_id) @workflow_api.pause_workflow(workflow_id) end |
#register_workflow(workflow, overwrite: true) ⇒ Object
Register a workflow definition
43 44 45 46 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 43 def register_workflow(workflow, overwrite: true) workflow_def = workflow.respond_to?(:to_workflow_def) ? workflow.to_workflow_def : workflow @metadata_api.update_workflows([workflow_def], overwrite: overwrite) end |
#remove_workflow(workflow_id, archive_workflow: true) ⇒ void
This method returns an undefined value.
Remove (delete) a workflow permanently
196 197 198 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 196 def remove_workflow(workflow_id, archive_workflow: true) @workflow_api.delete(workflow_id, archive_workflow: archive_workflow) end |
#rerun(workflow_id, rerun_request) ⇒ String
Rerun a workflow from a specific task
188 189 190 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 188 def rerun(workflow_id, rerun_request) @workflow_api.rerun(workflow_id, rerun_request) end |
#restart(workflow_id, use_latest_definitions: false) ⇒ void
This method returns an undefined value.
Restart a completed workflow
172 173 174 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 172 def restart(workflow_id, use_latest_definitions: false) @workflow_api.restart(workflow_id, use_latest_def: use_latest_definitions) end |
#resume(workflow_id) ⇒ void
This method returns an undefined value.
Resume a paused workflow
156 157 158 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 156 def resume(workflow_id) @workflow_api.resume_workflow(workflow_id) end |
#retry(workflow_id, resume_subworkflow_tasks: false) ⇒ void
This method returns an undefined value.
Retry a failed workflow
180 181 182 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 180 def retry(workflow_id, resume_subworkflow_tasks: false) @workflow_api.retry(workflow_id, resume_subworkflow_tasks: resume_subworkflow_tasks) end |
#skip_task_from_workflow(workflow_id, task_reference_name, request: nil) ⇒ void
This method returns an undefined value.
Skip a task in a running workflow
205 206 207 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 205 def skip_task_from_workflow(workflow_id, task_reference_name, request: nil) @workflow_api.skip_task_from_workflow(workflow_id, task_reference_name, request: request) end |
#start_workflow(request) ⇒ String
Start a workflow asynchronously (returns immediately with workflow ID)
55 56 57 58 59 60 61 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 55 def start_workflow(request) publish_workflow_input_size(request) @workflow_api.start_workflow(request) rescue StandardError => e publish_workflow_start_error(request, e) raise end |
#start_workflows(*requests) ⇒ Array<String>
Start multiple workflows
66 67 68 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 66 def start_workflows(*requests) requests.flatten.map { |request| start_workflow(request) } end |
#terminate(workflow_id, reason: nil) ⇒ void
This method returns an undefined value.
Terminate a running workflow
164 165 166 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 164 def terminate(workflow_id, reason: nil) @workflow_api.terminate(workflow_id, reason: reason) end |
#update_task(workflow_id, task_id, task_output, status) ⇒ String
Update a task result
219 220 221 222 223 224 225 226 227 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 219 def update_task(workflow_id, task_id, task_output, status) task_result = Http::Models::TaskResult.new( workflow_instance_id: workflow_id, task_id: task_id, output_data: task_output, status: status ) @task_api.update_task(task_result) end |
#update_task_by_ref_name(workflow_id, task_reference_name, task_output, status) ⇒ String
Update a task by reference name
235 236 237 238 239 240 241 242 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 235 def update_task_by_ref_name(workflow_id, task_reference_name, task_output, status) @task_api.update_task_by_ref_name( task_output, workflow_id: workflow_id, task_ref_name: task_reference_name, status: status ) end |
#wait_for_workflow(workflow_id, timeout_seconds: 60, poll_interval_seconds: 1.0) ⇒ Workflow
Wait for a workflow to complete
300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/conductor/workflow/workflow_executor.rb', line 300 def wait_for_workflow(workflow_id, timeout_seconds: 60, poll_interval_seconds: 1.0) deadline = Time.now + timeout_seconds loop do workflow = get_workflow(workflow_id, include_tasks: false) return workflow if workflow.terminal? raise Timeout::Error, "Workflow #{workflow_id} did not complete within #{timeout_seconds} seconds" if Time.now >= deadline sleep(poll_interval_seconds) end end |