Class: Conductor::Client::WorkflowClient

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/client/workflow_client.rb

Overview

WorkflowClient - High-level client for workflow operations

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = nil) ⇒ WorkflowClient

Initialize WorkflowClient

Parameters:

  • configuration (Configuration) (defaults to: nil)

    Optional configuration



16
17
18
19
20
# File 'lib/conductor/client/workflow_client.rb', line 16

def initialize(configuration = nil)
  @configuration = configuration || Configuration.new
  api_client = Http::ApiClient.new(configuration: @configuration)
  @workflow_api = Http::Api::WorkflowResourceApi.new(api_client)
end

Instance Attribute Details

#workflow_apiObject (readonly)

Returns the value of attribute workflow_api.



12
13
14
# File 'lib/conductor/client/workflow_client.rb', line 12

def workflow_api
  @workflow_api
end

Instance Method Details

#delete_workflow(workflow_id, archive_workflow: true) ⇒ void

This method returns an undefined value.

Delete a workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • archive_workflow (Boolean) (defaults to: true)

    Archive workflow before deleting (default: true)



62
63
64
# File 'lib/conductor/client/workflow_client.rb', line 62

def delete_workflow(workflow_id, archive_workflow: true)
  @workflow_api.delete(workflow_id, archive_workflow: archive_workflow)
end

#execute_workflow(request, request_id: nil, wait_until_task_ref: nil, wait_for_seconds: 30) ⇒ WorkflowRun

Execute a workflow synchronously and wait for completion

Parameters:

  • request (StartWorkflowRequest)

    Start workflow request

  • request_id (String) (defaults to: nil)

    Unique request ID (optional, auto-generated)

  • wait_until_task_ref (String) (defaults to: nil)

    Wait until task ref (optional)

  • wait_for_seconds (Integer) (defaults to: 30)

    Max wait time (default: 30)

Returns:

  • (WorkflowRun)

    Workflow run result



162
163
164
165
166
167
168
169
170
171
172
# File 'lib/conductor/client/workflow_client.rb', line 162

def execute_workflow(request, request_id: nil, wait_until_task_ref: nil, wait_for_seconds: 30)
  request_id ||= SecureRandom.uuid
  @workflow_api.execute_workflow(
    request,
    name: request.name,
    version: request.version || 1,
    request_id: request_id,
    wait_until_task_ref: wait_until_task_ref,
    wait_for_seconds: wait_for_seconds
  )
end

#get_by_correlation_id(name, correlation_id, include_closed: false, include_tasks: false) ⇒ Array<Workflow>

Get workflows by correlation ID

Parameters:

  • name (String)

    Workflow name

  • correlation_id (String)

    Correlation ID

  • include_closed (Boolean) (defaults to: false)

    Include closed workflows (default: false)

  • include_tasks (Boolean) (defaults to: false)

    Include task details (default: false)

Returns:

  • (Array<Workflow>)

    List of workflows



118
119
120
# File 'lib/conductor/client/workflow_client.rb', line 118

def get_by_correlation_id(name, correlation_id, include_closed: false, include_tasks: false)
  @workflow_api.get_workflows(name, correlation_id, include_closed: include_closed, include_tasks: include_tasks)
end

#get_by_correlation_ids(name, correlation_ids, include_closed: false, include_tasks: false) ⇒ Hash<String, Array<Workflow>>

Get workflows by multiple correlation IDs (batch)

Parameters:

  • name (String)

    Workflow name

  • correlation_ids (Array<String>)

    Correlation IDs

  • include_closed (Boolean) (defaults to: false)

    Include closed workflows (default: false)

  • include_tasks (Boolean) (defaults to: false)

    Include task details (default: false)

Returns:



190
191
192
193
# File 'lib/conductor/client/workflow_client.rb', line 190

def get_by_correlation_ids(name, correlation_ids, include_closed: false, include_tasks: false)
  @workflow_api.get_workflows_batch(name, correlation_ids, include_closed: include_closed,
                                                           include_tasks: include_tasks)
end

#get_running_workflows(name, version: nil, start_time: nil, end_time: nil) ⇒ Array<String>

Get running workflows by name

Parameters:

  • name (String)

    Workflow name

  • version (Integer) (defaults to: nil)

    Workflow version (optional)

  • start_time (Integer) (defaults to: nil)

    Start time in epoch millis (optional)

  • end_time (Integer) (defaults to: nil)

    End time in epoch millis (optional)

Returns:

  • (Array<String>)

    List of workflow IDs



128
129
130
# File 'lib/conductor/client/workflow_client.rb', line 128

def get_running_workflows(name, version: nil, start_time: nil, end_time: nil)
  @workflow_api.get_running_workflow(name, version: version, start_time: start_time, end_time: end_time)
end

#get_workflow(workflow_id, include_tasks: true) ⇒ Workflow

Get workflow execution status

Parameters:

  • workflow_id (String)

    Workflow ID

  • include_tasks (Boolean) (defaults to: true)

    Include task details (default: true)

Returns:



54
55
56
# File 'lib/conductor/client/workflow_client.rb', line 54

def get_workflow(workflow_id, include_tasks: true)
  @workflow_api.get_execution_status(workflow_id, include_tasks: include_tasks)
end

#get_workflow_def(name, version: nil) ⇒ WorkflowDef

Get a workflow definition

Parameters:

  • name (String)

    Workflow name

  • version (Integer) (defaults to: nil)

    Workflow version (optional)

Returns:

  • (WorkflowDef)

    Workflow definition



144
145
146
# File 'lib/conductor/client/workflow_client.rb', line 144

def get_workflow_def(name, version: nil)
  @workflow_api.get_workflow_def(name, version: version)
end

#pause_workflow(workflow_id) ⇒ void

This method returns an undefined value.

Pause a workflow

Parameters:

  • workflow_id (String)

    Workflow ID



77
78
79
# File 'lib/conductor/client/workflow_client.rb', line 77

def pause_workflow(workflow_id)
  @workflow_api.pause_workflow(workflow_id)
end

#register_workflow(workflow_def, overwrite: false) ⇒ void

This method returns an undefined value.

Register a workflow definition

Parameters:

  • workflow_def (WorkflowDef)

    Workflow definition to register

  • overwrite (Boolean) (defaults to: false)

    Overwrite existing definition (default: false)



136
137
138
# File 'lib/conductor/client/workflow_client.rb', line 136

def register_workflow(workflow_def, overwrite: false)
  @workflow_api.register_workflow(workflow_def, overwrite: overwrite)
end

#remove_workflow(workflow_id, archive_workflow: true) ⇒ void

This method returns an undefined value.

Remove (permanently delete) a workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • archive_workflow (Boolean) (defaults to: true)

    Archive before deleting (default: true)



237
238
239
# File 'lib/conductor/client/workflow_client.rb', line 237

def remove_workflow(workflow_id, archive_workflow: true)
  @workflow_api.delete(workflow_id, archive_workflow: archive_workflow)
end

#rerun_workflow(workflow_id, rerun_request) ⇒ String

Rerun a workflow from a specific task

Parameters:

  • workflow_id (String)

    Workflow ID

  • rerun_request (RerunWorkflowRequest)

    Rerun request

Returns:

  • (String)

    New workflow ID



108
109
110
# File 'lib/conductor/client/workflow_client.rb', line 108

def rerun_workflow(workflow_id, rerun_request)
  @workflow_api.rerun(workflow_id, rerun_request)
end

#restart_workflow(workflow_id, use_latest_def: false) ⇒ void

This method returns an undefined value.

Restart a completed workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • use_latest_def (Boolean) (defaults to: false)

    Use latest workflow definition (default: false)



92
93
94
# File 'lib/conductor/client/workflow_client.rb', line 92

def restart_workflow(workflow_id, use_latest_def: false)
  @workflow_api.restart(workflow_id, use_latest_def: use_latest_def)
end

#resume_workflow(workflow_id) ⇒ void

This method returns an undefined value.

Resume a paused workflow

Parameters:

  • workflow_id (String)

    Workflow ID



84
85
86
# File 'lib/conductor/client/workflow_client.rb', line 84

def resume_workflow(workflow_id)
  @workflow_api.resume_workflow(workflow_id)
end

#retry_workflow(workflow_id, resume_subworkflow_tasks: false) ⇒ void

This method returns an undefined value.

Retry a failed workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • resume_subworkflow_tasks (Boolean) (defaults to: false)

    Resume subworkflow tasks (default: false)



100
101
102
# File 'lib/conductor/client/workflow_client.rb', line 100

def retry_workflow(workflow_id, resume_subworkflow_tasks: false)
  @workflow_api.retry(workflow_id, resume_subworkflow_tasks: resume_subworkflow_tasks)
end

#search(start: 0, size: 100, free_text: '*', query: nil) ⇒ SearchResult

Search for workflows

Parameters:

  • start (Integer) (defaults to: 0)

    Start index (default: 0)

  • size (Integer) (defaults to: 100)

    Page size (default: 100)

  • free_text (String) (defaults to: '*')

    Free text search (default: '*')

  • query (String) (defaults to: nil)

    Query string (optional)

Returns:

  • (SearchResult)

    Search results



180
181
182
# File 'lib/conductor/client/workflow_client.rb', line 180

def search(start: 0, size: 100, free_text: '*', query: nil)
  @workflow_api.search(start: start, size: size, free_text: free_text, query: query)
end

#skip_task_from_workflow(workflow_id, task_reference_name, request: nil) ⇒ void

This method returns an undefined value.

Skip a task in a running workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_reference_name (String)

    Task reference name

  • request (SkipTaskRequest) (defaults to: nil)

    Skip task request (optional)



229
230
231
# File 'lib/conductor/client/workflow_client.rb', line 229

def skip_task_from_workflow(workflow_id, task_reference_name, request: nil)
  @workflow_api.skip_task_from_workflow(workflow_id, task_reference_name, request: request)
end

#start(name_or_request, input: {}, version: nil, correlation_id: nil) ⇒ String

Start a workflow with name and input, or with a StartWorkflowRequest

Parameters:

  • name_or_request (String, StartWorkflowRequest)

    Workflow name or StartWorkflowRequest

  • input (Hash) (defaults to: {})

    Workflow input data (default: {}, ignored if request object passed)

  • version (Integer) (defaults to: nil)

    Workflow version (optional, ignored if request object passed)

  • correlation_id (String) (defaults to: nil)

    Correlation ID (optional, ignored if request object passed)

Returns:

  • (String)

    Workflow ID



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/conductor/client/workflow_client.rb', line 35

def start(name_or_request, input: {}, version: nil, correlation_id: nil)
  # Handle both StartWorkflowRequest objects and simple name/input arguments
  if name_or_request.is_a?(Http::Models::StartWorkflowRequest)
    start_workflow(name_or_request)
  else
    request = Http::Models::StartWorkflowRequest.new(
      name: name_or_request,
      input: input,
      version: version,
      correlation_id: correlation_id
    )
    start_workflow(request)
  end
end

#start_workflow(request) ⇒ String

Start a new workflow

Parameters:

  • request (StartWorkflowRequest)

    Start workflow request

Returns:

  • (String)

    Workflow ID



25
26
27
# File 'lib/conductor/client/workflow_client.rb', line 25

def start_workflow(request)
  @workflow_api.start_workflow(request)
end

#terminate_workflow(workflow_id, reason: nil) ⇒ void

This method returns an undefined value.

Terminate a running workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • reason (String) (defaults to: nil)

    Termination reason (optional)



70
71
72
# File 'lib/conductor/client/workflow_client.rb', line 70

def terminate_workflow(workflow_id, reason: nil)
  @workflow_api.terminate(workflow_id, reason: reason)
end

#test_workflow(request) ⇒ Workflow

Test a workflow with mocked task outputs

Parameters:

  • request (WorkflowTestRequest)

    Test request

Returns:



220
221
222
# File 'lib/conductor/client/workflow_client.rb', line 220

def test_workflow(request)
  @workflow_api.test_workflow(request)
end

#unregister_workflow(name, version:) ⇒ void

This method returns an undefined value.

Delete a workflow definition

Parameters:

  • name (String)

    Workflow name

  • version (Integer)

    Workflow version



152
153
154
# File 'lib/conductor/client/workflow_client.rb', line 152

def unregister_workflow(name, version:)
  @workflow_api.unregister_workflow(name, version: version)
end

#update_state(workflow_id, state_update, wait_until_task_ref: nil, wait_for_seconds: 10) ⇒ WorkflowRun

Update workflow and task state

Parameters:

  • workflow_id (String)

    Workflow ID

  • state_update (WorkflowStateUpdate)

    State update request

  • wait_until_task_ref (String) (defaults to: nil)

    Wait until task ref (optional)

  • wait_for_seconds (Integer) (defaults to: 10)

    Wait time (default: 10)

Returns:

  • (WorkflowRun)


209
210
211
212
213
214
215
# File 'lib/conductor/client/workflow_client.rb', line 209

def update_state(workflow_id, state_update, wait_until_task_ref: nil, wait_for_seconds: 10)
  @workflow_api.update_workflow_and_task_state(
    workflow_id, state_update,
    wait_until_task_ref: wait_until_task_ref,
    wait_for_seconds: wait_for_seconds
  )
end

#update_variables(workflow_id, variables) ⇒ Workflow

Update workflow variables

Parameters:

  • workflow_id (String)

    Workflow ID

  • variables (Hash)

    Variables to update

Returns:



199
200
201
# File 'lib/conductor/client/workflow_client.rb', line 199

def update_variables(workflow_id, variables)
  @workflow_api.update_workflow_state(workflow_id, variables)
end