Class: Conductor::Client::WorkflowClient
- Inherits:
-
Object
- Object
- Conductor::Client::WorkflowClient
- Defined in:
- lib/conductor/client/workflow_client.rb
Overview
WorkflowClient - High-level client for workflow operations
Instance Attribute Summary collapse
-
#workflow_api ⇒ Object
readonly
Returns the value of attribute workflow_api.
Instance Method Summary collapse
-
#delete_workflow(workflow_id, archive_workflow: true) ⇒ void
Delete a workflow.
-
#execute_workflow(request, request_id: nil, wait_until_task_ref: nil, wait_for_seconds: 30) ⇒ WorkflowRun
Execute a workflow synchronously and wait for completion.
-
#get_by_correlation_id(name, correlation_id, include_closed: false, include_tasks: false) ⇒ Array<Workflow>
Get workflows by correlation ID.
-
#get_by_correlation_ids(name, correlation_ids, include_closed: false, include_tasks: false) ⇒ Hash<String, Array<Workflow>>
Get workflows by multiple correlation IDs (batch).
-
#get_running_workflows(name, version: nil, start_time: nil, end_time: nil) ⇒ Array<String>
Get running workflows by name.
-
#get_workflow(workflow_id, include_tasks: true) ⇒ Workflow
Get workflow execution status.
-
#get_workflow_def(name, version: nil) ⇒ WorkflowDef
Get a workflow definition.
-
#initialize(configuration = nil) ⇒ WorkflowClient
constructor
Initialize WorkflowClient.
-
#pause_workflow(workflow_id) ⇒ void
Pause a workflow.
-
#register_workflow(workflow_def, overwrite: false) ⇒ void
Register a workflow definition.
-
#remove_workflow(workflow_id, archive_workflow: true) ⇒ void
Remove (permanently delete) a workflow.
-
#rerun_workflow(workflow_id, rerun_request) ⇒ String
Rerun a workflow from a specific task.
-
#restart_workflow(workflow_id, use_latest_def: false) ⇒ void
Restart a completed workflow.
-
#resume_workflow(workflow_id) ⇒ void
Resume a paused workflow.
-
#retry_workflow(workflow_id, resume_subworkflow_tasks: false) ⇒ void
Retry a failed workflow.
-
#search(start: 0, size: 100, free_text: '*', query: nil) ⇒ SearchResult
Search for workflows.
-
#skip_task_from_workflow(workflow_id, task_reference_name, request: nil) ⇒ void
Skip a task in a running workflow.
-
#start(name_or_request, input: {}, version: nil, correlation_id: nil) ⇒ String
Start a workflow with name and input, or with a StartWorkflowRequest.
-
#start_workflow(request) ⇒ String
Start a new workflow.
-
#terminate_workflow(workflow_id, reason: nil) ⇒ void
Terminate a running workflow.
-
#test_workflow(request) ⇒ Workflow
Test a workflow with mocked task outputs.
-
#unregister_workflow(name, version:) ⇒ void
Delete a workflow definition.
-
#update_state(workflow_id, state_update, wait_until_task_ref: nil, wait_for_seconds: 10) ⇒ WorkflowRun
Update workflow and task state.
-
#update_variables(workflow_id, variables) ⇒ Workflow
Update workflow variables.
Constructor Details
#initialize(configuration = nil) ⇒ WorkflowClient
Initialize WorkflowClient
16 17 18 19 20 |
# File 'lib/conductor/client/workflow_client.rb', line 16 def initialize(configuration = nil) @configuration = configuration || Configuration.new api_client = Http::ApiClient.new(configuration: @configuration) @workflow_api = Http::Api::WorkflowResourceApi.new(api_client) end |
Instance Attribute Details
#workflow_api ⇒ Object (readonly)
Returns the value of attribute workflow_api.
12 13 14 |
# File 'lib/conductor/client/workflow_client.rb', line 12 def workflow_api @workflow_api end |
Instance Method Details
#delete_workflow(workflow_id, archive_workflow: true) ⇒ void
This method returns an undefined value.
Delete a workflow
62 63 64 |
# File 'lib/conductor/client/workflow_client.rb', line 62 def delete_workflow(workflow_id, archive_workflow: true) @workflow_api.delete(workflow_id, archive_workflow: archive_workflow) end |
#execute_workflow(request, request_id: nil, wait_until_task_ref: nil, wait_for_seconds: 30) ⇒ WorkflowRun
Execute a workflow synchronously and wait for completion
162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/conductor/client/workflow_client.rb', line 162 def execute_workflow(request, request_id: nil, wait_until_task_ref: nil, wait_for_seconds: 30) request_id ||= SecureRandom.uuid @workflow_api.execute_workflow( request, name: request.name, version: request.version || 1, request_id: request_id, wait_until_task_ref: wait_until_task_ref, wait_for_seconds: wait_for_seconds ) end |
#get_by_correlation_id(name, correlation_id, include_closed: false, include_tasks: false) ⇒ Array<Workflow>
Get workflows by correlation ID
118 119 120 |
# File 'lib/conductor/client/workflow_client.rb', line 118 def get_by_correlation_id(name, correlation_id, include_closed: false, include_tasks: false) @workflow_api.get_workflows(name, correlation_id, include_closed: include_closed, include_tasks: include_tasks) end |
#get_by_correlation_ids(name, correlation_ids, include_closed: false, include_tasks: false) ⇒ Hash<String, Array<Workflow>>
Get workflows by multiple correlation IDs (batch)
190 191 192 193 |
# File 'lib/conductor/client/workflow_client.rb', line 190 def get_by_correlation_ids(name, correlation_ids, include_closed: false, include_tasks: false) @workflow_api.get_workflows_batch(name, correlation_ids, include_closed: include_closed, include_tasks: include_tasks) end |
#get_running_workflows(name, version: nil, start_time: nil, end_time: nil) ⇒ Array<String>
Get running workflows by name
128 129 130 |
# File 'lib/conductor/client/workflow_client.rb', line 128 def get_running_workflows(name, version: nil, start_time: nil, end_time: nil) @workflow_api.get_running_workflow(name, version: version, start_time: start_time, end_time: end_time) end |
#get_workflow(workflow_id, include_tasks: true) ⇒ Workflow
Get workflow execution status
54 55 56 |
# File 'lib/conductor/client/workflow_client.rb', line 54 def get_workflow(workflow_id, include_tasks: true) @workflow_api.get_execution_status(workflow_id, include_tasks: include_tasks) end |
#get_workflow_def(name, version: nil) ⇒ WorkflowDef
Get a workflow definition
144 145 146 |
# File 'lib/conductor/client/workflow_client.rb', line 144 def get_workflow_def(name, version: nil) @workflow_api.get_workflow_def(name, version: version) end |
#pause_workflow(workflow_id) ⇒ void
This method returns an undefined value.
Pause a workflow
77 78 79 |
# File 'lib/conductor/client/workflow_client.rb', line 77 def pause_workflow(workflow_id) @workflow_api.pause_workflow(workflow_id) end |
#register_workflow(workflow_def, overwrite: false) ⇒ void
This method returns an undefined value.
Register a workflow definition
136 137 138 |
# File 'lib/conductor/client/workflow_client.rb', line 136 def register_workflow(workflow_def, overwrite: false) @workflow_api.register_workflow(workflow_def, overwrite: overwrite) end |
#remove_workflow(workflow_id, archive_workflow: true) ⇒ void
This method returns an undefined value.
Remove (permanently delete) a workflow
237 238 239 |
# File 'lib/conductor/client/workflow_client.rb', line 237 def remove_workflow(workflow_id, archive_workflow: true) @workflow_api.delete(workflow_id, archive_workflow: archive_workflow) end |
#rerun_workflow(workflow_id, rerun_request) ⇒ String
Rerun a workflow from a specific task
108 109 110 |
# File 'lib/conductor/client/workflow_client.rb', line 108 def rerun_workflow(workflow_id, rerun_request) @workflow_api.rerun(workflow_id, rerun_request) end |
#restart_workflow(workflow_id, use_latest_def: false) ⇒ void
This method returns an undefined value.
Restart a completed workflow
92 93 94 |
# File 'lib/conductor/client/workflow_client.rb', line 92 def restart_workflow(workflow_id, use_latest_def: false) @workflow_api.restart(workflow_id, use_latest_def: use_latest_def) end |
#resume_workflow(workflow_id) ⇒ void
This method returns an undefined value.
Resume a paused workflow
84 85 86 |
# File 'lib/conductor/client/workflow_client.rb', line 84 def resume_workflow(workflow_id) @workflow_api.resume_workflow(workflow_id) end |
#retry_workflow(workflow_id, resume_subworkflow_tasks: false) ⇒ void
This method returns an undefined value.
Retry a failed workflow
100 101 102 |
# File 'lib/conductor/client/workflow_client.rb', line 100 def retry_workflow(workflow_id, resume_subworkflow_tasks: false) @workflow_api.retry(workflow_id, resume_subworkflow_tasks: resume_subworkflow_tasks) end |
#search(start: 0, size: 100, free_text: '*', query: nil) ⇒ SearchResult
Search for workflows
180 181 182 |
# File 'lib/conductor/client/workflow_client.rb', line 180 def search(start: 0, size: 100, free_text: '*', query: nil) @workflow_api.search(start: start, size: size, free_text: free_text, query: query) end |
#skip_task_from_workflow(workflow_id, task_reference_name, request: nil) ⇒ void
This method returns an undefined value.
Skip a task in a running workflow
229 230 231 |
# File 'lib/conductor/client/workflow_client.rb', line 229 def skip_task_from_workflow(workflow_id, task_reference_name, request: nil) @workflow_api.skip_task_from_workflow(workflow_id, task_reference_name, request: request) end |
#start(name_or_request, input: {}, version: nil, correlation_id: nil) ⇒ String
Start a workflow with name and input, or with a StartWorkflowRequest
35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/conductor/client/workflow_client.rb', line 35 def start(name_or_request, input: {}, version: nil, correlation_id: nil) # Handle both StartWorkflowRequest objects and simple name/input arguments if name_or_request.is_a?(Http::Models::StartWorkflowRequest) start_workflow(name_or_request) else request = Http::Models::StartWorkflowRequest.new( name: name_or_request, input: input, version: version, correlation_id: correlation_id ) start_workflow(request) end end |
#start_workflow(request) ⇒ String
Start a new workflow
25 26 27 |
# File 'lib/conductor/client/workflow_client.rb', line 25 def start_workflow(request) @workflow_api.start_workflow(request) end |
#terminate_workflow(workflow_id, reason: nil) ⇒ void
This method returns an undefined value.
Terminate a running workflow
70 71 72 |
# File 'lib/conductor/client/workflow_client.rb', line 70 def terminate_workflow(workflow_id, reason: nil) @workflow_api.terminate(workflow_id, reason: reason) end |
#test_workflow(request) ⇒ Workflow
Test a workflow with mocked task outputs
220 221 222 |
# File 'lib/conductor/client/workflow_client.rb', line 220 def test_workflow(request) @workflow_api.test_workflow(request) end |
#unregister_workflow(name, version:) ⇒ void
This method returns an undefined value.
Delete a workflow definition
152 153 154 |
# File 'lib/conductor/client/workflow_client.rb', line 152 def unregister_workflow(name, version:) @workflow_api.unregister_workflow(name, version: version) end |
#update_state(workflow_id, state_update, wait_until_task_ref: nil, wait_for_seconds: 10) ⇒ WorkflowRun
Update workflow and task state
209 210 211 212 213 214 215 |
# File 'lib/conductor/client/workflow_client.rb', line 209 def update_state(workflow_id, state_update, wait_until_task_ref: nil, wait_for_seconds: 10) @workflow_api.update_workflow_and_task_state( workflow_id, state_update, wait_until_task_ref: wait_until_task_ref, wait_for_seconds: wait_for_seconds ) end |
#update_variables(workflow_id, variables) ⇒ Workflow
Update workflow variables
199 200 201 |
# File 'lib/conductor/client/workflow_client.rb', line 199 def update_variables(workflow_id, variables) @workflow_api.update_workflow_state(workflow_id, variables) end |