Class: Conductor::Http::Api::WorkflowResourceApi

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/http/api/workflow_resource_api.rb

Overview

WorkflowResourceApi - API for workflow operations

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(api_client = nil) ⇒ WorkflowResourceApi

Initialize WorkflowResourceApi

Parameters:

  • api_client (ApiClient) (defaults to: nil)

    Optional API client



14
15
16
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 14

def initialize(api_client = nil)
  @api_client = api_client || ApiClient.new
end

Instance Attribute Details

#api_clientObject

Returns the value of attribute api_client.



10
11
12
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 10

def api_client
  @api_client
end

Instance Method Details

#decide(workflow_id) ⇒ void

This method returns an undefined value.

Decide on workflow (evaluate next steps)

Parameters:

  • workflow_id (String)

    Workflow ID



285
286
287
288
289
290
291
292
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 285

def decide(workflow_id)
  @api_client.call_api(
    '/workflow/decide/{workflowId}',
    'PUT',
    path_params: { workflowId: workflow_id },
    return_http_data_only: true
  )
end

#delete(workflow_id, archive_workflow: true) ⇒ void

This method returns an undefined value.

Delete a workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • archive_workflow (Boolean) (defaults to: true)

    Archive workflow before deleting (default: true)



50
51
52
53
54
55
56
57
58
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 50

def delete(workflow_id, archive_workflow: true)
  @api_client.call_api(
    '/workflow/{workflowId}/remove',
    'DELETE',
    path_params: { workflowId: workflow_id },
    query_params: { archiveWorkflow: archive_workflow },
    return_http_data_only: true
  )
end

#execute_workflow(body, name:, version:, request_id:, wait_until_task_ref: nil, wait_for_seconds: 10) ⇒ WorkflowRun

Execute a workflow synchronously and wait for completion

Parameters:

  • body (StartWorkflowRequest)

    Start workflow request

  • name (String)

    Workflow name

  • version (Integer)

    Workflow version

  • request_id (String)

    Unique request ID for idempotency

  • wait_until_task_ref (String) (defaults to: nil)

    Wait until this task completes (optional)

  • wait_for_seconds (Integer) (defaults to: 10)

    Maximum time to wait (default: 10)

Returns:

  • (WorkflowRun)

    Workflow run result



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 233

def execute_workflow(body, name:, version:, request_id:, wait_until_task_ref: nil, wait_for_seconds: 10)
  query_params = {
    requestId: request_id,
    waitForSeconds: wait_for_seconds
  }
  query_params[:waitUntilTaskRef] = wait_until_task_ref if wait_until_task_ref

  @api_client.call_api(
    '/workflow/execute/{name}/{version}',
    'POST',
    path_params: { name: name, version: version },
    query_params: query_params,
    body: body,
    return_type: 'WorkflowRun',
    return_http_data_only: true
  )
end

#get_execution_status(workflow_id, include_tasks: true) ⇒ Workflow

Get workflow execution status

Parameters:

  • workflow_id (String)

    Workflow ID

  • include_tasks (Boolean) (defaults to: true)

    Include task details (default: true)

Returns:



35
36
37
38
39
40
41
42
43
44
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 35

def get_execution_status(workflow_id, include_tasks: true)
  @api_client.call_api(
    '/workflow/{workflowId}',
    'GET',
    path_params: { workflowId: workflow_id },
    query_params: { includeTasks: include_tasks },
    return_type: 'Workflow',
    return_http_data_only: true
  )
end

#get_running_workflow(name, version: nil, start_time: nil, end_time: nil) ⇒ Array<String>

Get running workflows by name

Parameters:

  • name (String)

    Workflow name

  • version (Integer) (defaults to: nil)

    Workflow version (optional)

  • start_time (Integer) (defaults to: nil)

    Start time (epoch millis, optional)

  • end_time (Integer) (defaults to: nil)

    End time (epoch millis, optional)

Returns:

  • (Array<String>)

    List of workflow IDs



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 164

def get_running_workflow(name, version: nil, start_time: nil, end_time: nil)
  query_params = {}
  query_params[:version] = version if version
  query_params[:startTime] = start_time if start_time
  query_params[:endTime] = end_time if end_time

  @api_client.call_api(
    '/workflow/running/{name}',
    'GET',
    path_params: { name: name },
    query_params: query_params,
    return_type: 'Array<String>',
    return_http_data_only: true
  )
end

#get_workflow_def(name, version: nil) ⇒ WorkflowDef

Get a workflow definition

Parameters:

  • name (String)

    Workflow name

  • version (Integer) (defaults to: nil)

    Workflow version (optional)

Returns:

  • (WorkflowDef)

    Workflow definition



198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 198

def get_workflow_def(name, version: nil)
  query_params = {}
  query_params[:version] = version if version

  @api_client.call_api(
    '/metadata/workflow/{name}',
    'GET',
    path_params: { name: name },
    query_params: query_params,
    return_type: 'WorkflowDef',
    return_http_data_only: true
  )
end

#get_workflow_status(workflow_id, include_output: false, include_variables: false) ⇒ Hash

Get workflow status (lightweight, without full task details)

Parameters:

  • workflow_id (String)

    Workflow ID

  • include_output (Boolean) (defaults to: false)

    Include workflow output (default: false)

  • include_variables (Boolean) (defaults to: false)

    Include workflow variables (default: false)

Returns:

  • (Hash)

    Workflow status



256
257
258
259
260
261
262
263
264
265
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 256

def get_workflow_status(workflow_id, include_output: false, include_variables: false)
  @api_client.call_api(
    '/workflow/{workflowId}/status',
    'GET',
    path_params: { workflowId: workflow_id },
    query_params: { includeOutput: include_output, includeVariables: include_variables },
    return_type: 'Hash',
    return_http_data_only: true
  )
end

#get_workflows(name, correlation_id, include_closed: false, include_tasks: false) ⇒ Array<Workflow>

Get workflows by correlation ID

Parameters:

  • name (String)

    Workflow name

  • correlation_id (String)

    Correlation ID

  • include_closed (Boolean) (defaults to: false)

    Include closed workflows (default: false)

  • include_tasks (Boolean) (defaults to: false)

    Include task details (default: false)

Returns:

  • (Array<Workflow>)

    List of workflows



147
148
149
150
151
152
153
154
155
156
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 147

def get_workflows(name, correlation_id, include_closed: false, include_tasks: false)
  @api_client.call_api(
    '/workflow/{name}/correlated/{correlationId}',
    'GET',
    path_params: { name: name, correlationId: correlation_id },
    query_params: { includeClosed: include_closed, includeTasks: include_tasks },
    return_type: 'Array<Workflow>',
    return_http_data_only: true
  )
end

#get_workflows_batch(name, correlation_ids, include_closed: false, include_tasks: false) ⇒ Hash<String, Array<Workflow>>

Get workflows by multiple correlation IDs (batch)

Parameters:

  • name (String)

    Workflow name

  • correlation_ids (Array<String>)

    List of correlation IDs

  • include_closed (Boolean) (defaults to: false)

    Include closed workflows (default: false)

  • include_tasks (Boolean) (defaults to: false)

    Include task details (default: false)

Returns:



384
385
386
387
388
389
390
391
392
393
394
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 384

def get_workflows_batch(name, correlation_ids, include_closed: false, include_tasks: false)
  @api_client.call_api(
    '/workflow/{name}/correlated',
    'POST',
    path_params: { name: name },
    query_params: { includeClosed: include_closed, includeTasks: include_tasks },
    body: correlation_ids,
    return_type: 'Hash<String, Object>',
    return_http_data_only: true
  )
end

#jump_to_task(workflow_id, task_reference_name, input: nil) ⇒ void

This method returns an undefined value.

Jump to a specific task in a workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_reference_name (String)

    Task reference name

  • input (Hash) (defaults to: nil)

    Task input (optional)



425
426
427
428
429
430
431
432
433
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 425

def jump_to_task(workflow_id, task_reference_name, input: nil)
  @api_client.call_api(
    '/workflow/{workflowId}/jump/{taskReferenceName}',
    'POST',
    path_params: { workflowId: workflow_id, taskReferenceName: task_reference_name },
    body: input,
    return_http_data_only: true
  )
end

#pause_workflow(workflow_id) ⇒ void

This method returns an undefined value.

Pause a workflow

Parameters:

  • workflow_id (String)

    Workflow ID



63
64
65
66
67
68
69
70
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 63

def pause_workflow(workflow_id)
  @api_client.call_api(
    '/workflow/{workflowId}/pause',
    'PUT',
    path_params: { workflowId: workflow_id },
    return_http_data_only: true
  )
end

#register_workflow(body, overwrite: false) ⇒ void

This method returns an undefined value.

Register a workflow definition

Parameters:

  • body (WorkflowDef)

    Workflow definition

  • overwrite (Boolean) (defaults to: false)

    Overwrite existing definition (default: false)



184
185
186
187
188
189
190
191
192
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 184

def register_workflow(body, overwrite: false)
  @api_client.call_api(
    '/metadata/workflow',
    'POST',
    query_params: { overwrite: overwrite },
    body: body,
    return_http_data_only: true
  )
end

#rerun(workflow_id, body) ⇒ String

Rerun a workflow from a specific task

Parameters:

  • workflow_id (String)

    Workflow ID

  • body (RerunWorkflowRequest)

    Rerun request

Returns:

  • (String)

    New workflow ID



102
103
104
105
106
107
108
109
110
111
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 102

def rerun(workflow_id, body)
  @api_client.call_api(
    '/workflow/{workflowId}/rerun',
    'POST',
    path_params: { workflowId: workflow_id },
    body: body,
    return_type: 'String',
    return_http_data_only: true
  )
end

#reset_workflow(workflow_id) ⇒ void

This method returns an undefined value.

Reset workflow callbacks

Parameters:

  • workflow_id (String)

    Workflow ID



297
298
299
300
301
302
303
304
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 297

def reset_workflow(workflow_id)
  @api_client.call_api(
    '/workflow/{workflowId}/resetcallbacks',
    'POST',
    path_params: { workflowId: workflow_id },
    return_http_data_only: true
  )
end

#restart(workflow_id, use_latest_def: false) ⇒ void

This method returns an undefined value.

Restart a completed workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • use_latest_def (Boolean) (defaults to: false)

    Use latest workflow definition (default: false)



88
89
90
91
92
93
94
95
96
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 88

def restart(workflow_id, use_latest_def: false)
  @api_client.call_api(
    '/workflow/{workflowId}/restart',
    'POST',
    path_params: { workflowId: workflow_id },
    query_params: { useLatestDefinitions: use_latest_def },
    return_http_data_only: true
  )
end

#resume_workflow(workflow_id) ⇒ void

This method returns an undefined value.

Resume a paused workflow

Parameters:

  • workflow_id (String)

    Workflow ID



75
76
77
78
79
80
81
82
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 75

def resume_workflow(workflow_id)
  @api_client.call_api(
    '/workflow/{workflowId}/resume',
    'PUT',
    path_params: { workflowId: workflow_id },
    return_http_data_only: true
  )
end

#retry(workflow_id, resume_subworkflow_tasks: false) ⇒ void

This method returns an undefined value.

Retry a failed workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • resume_subworkflow_tasks (Boolean) (defaults to: false)

    Resume subworkflow tasks (default: false)



131
132
133
134
135
136
137
138
139
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 131

def retry(workflow_id, resume_subworkflow_tasks: false)
  @api_client.call_api(
    '/workflow/{workflowId}/retry',
    'POST',
    path_params: { workflowId: workflow_id },
    query_params: { resumeSubworkflowTasks: resume_subworkflow_tasks },
    return_http_data_only: true
  )
end

#search(start: 0, size: 100, free_text: '*', query: nil, skip_cache: false) ⇒ SearchResult

Search for workflows

Parameters:

  • start (Integer) (defaults to: 0)

    Start index (default: 0)

  • size (Integer) (defaults to: 100)

    Page size (default: 100)

  • free_text (String) (defaults to: '*')

    Free text search (default: '*')

  • query (String) (defaults to: nil)

    Query string (optional)

  • skip_cache (Boolean) (defaults to: false)

    Skip cache (default: false)

Returns:

  • (SearchResult)

    Search results with workflow summaries



313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 313

def search(start: 0, size: 100, free_text: '*', query: nil, skip_cache: false)
  query_params = { start: start, size: size, freeText: free_text, skipCache: skip_cache }
  query_params[:query] = query if query

  @api_client.call_api(
    '/workflow/search',
    'GET',
    query_params: query_params,
    return_type: 'SearchResult',
    return_http_data_only: true
  )
end

#skip_task_from_workflow(workflow_id, task_reference_name, request: nil) ⇒ void

This method returns an undefined value.

Skip a task in a running workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_reference_name (String)

    Task reference name to skip

  • request (Hash) (defaults to: nil)

    Skip task request body (optional)



272
273
274
275
276
277
278
279
280
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 272

def skip_task_from_workflow(workflow_id, task_reference_name, request: nil)
  @api_client.call_api(
    '/workflow/{workflowId}/skiptask/{taskReferenceName}',
    'PUT',
    path_params: { workflowId: workflow_id, taskReferenceName: task_reference_name },
    body: request,
    return_http_data_only: true
  )
end

#start_workflow(body) ⇒ String

Start a workflow execution

Parameters:

  • body (StartWorkflowRequest)

    Start workflow request

Returns:

  • (String)

    Workflow ID



21
22
23
24
25
26
27
28
29
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 21

def start_workflow(body)
  @api_client.call_api(
    '/workflow',
    'POST',
    body: body,
    return_type: 'String',
    return_http_data_only: true
  )
end

#start_workflow_by_name(name, body, version: nil, correlation_id: nil, priority: nil) ⇒ String

Start a workflow by name (alternative endpoint)

Parameters:

  • name (String)

    Workflow name

  • body (Hash)

    Workflow input

  • version (Integer) (defaults to: nil)

    Workflow version (optional)

  • correlation_id (String) (defaults to: nil)

    Correlation ID (optional)

  • priority (Integer) (defaults to: nil)

    Priority (optional)

Returns:

  • (String)

    Workflow ID



403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 403

def start_workflow_by_name(name, body, version: nil, correlation_id: nil, priority: nil)
  query_params = {}
  query_params[:version] = version if version
  query_params[:correlationId] = correlation_id if correlation_id
  query_params[:priority] = priority if priority

  @api_client.call_api(
    '/workflow/{name}',
    'POST',
    path_params: { name: name },
    query_params: query_params,
    body: body,
    return_type: 'String',
    return_http_data_only: true
  )
end

#terminate(workflow_id, reason: nil) ⇒ void

This method returns an undefined value.

Terminate a running workflow

Parameters:

  • workflow_id (String)

    Workflow ID

  • reason (String) (defaults to: nil)

    Termination reason



117
118
119
120
121
122
123
124
125
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 117

def terminate(workflow_id, reason: nil)
  @api_client.call_api(
    '/workflow/{workflowId}',
    'DELETE',
    path_params: { workflowId: workflow_id },
    query_params: reason ? { reason: reason } : {},
    return_http_data_only: true
  )
end

#test_workflow(body) ⇒ Workflow

Test a workflow with mocked task outputs

Parameters:

  • body (WorkflowTestRequest)

    Test request

Returns:



368
369
370
371
372
373
374
375
376
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 368

def test_workflow(body)
  @api_client.call_api(
    '/workflow/test',
    'POST',
    body: body,
    return_type: 'Workflow',
    return_http_data_only: true
  )
end

#unregister_workflow(name, version:) ⇒ void

This method returns an undefined value.

Delete a workflow definition

Parameters:

  • name (String)

    Workflow name

  • version (Integer)

    Workflow version



216
217
218
219
220
221
222
223
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 216

def unregister_workflow(name, version:)
  @api_client.call_api(
    '/metadata/workflow/{name}/{version}',
    'DELETE',
    path_params: { name: name, version: version },
    return_http_data_only: true
  )
end

#update_workflow_and_task_state(workflow_id, body, request_id: nil, wait_until_task_ref: nil, wait_for_seconds: 10) ⇒ WorkflowRun

Update workflow and task state

Parameters:

  • workflow_id (String)

    Workflow ID

  • body (WorkflowStateUpdate)

    State update request

  • request_id (String) (defaults to: nil)

    Request ID (optional)

  • wait_until_task_ref (String) (defaults to: nil)

    Wait until task ref (optional)

  • wait_for_seconds (Integer) (defaults to: 10)

    Wait time in seconds (default: 10)

Returns:

  • (WorkflowRun)


348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 348

def update_workflow_and_task_state(workflow_id, body, request_id: nil, wait_until_task_ref: nil,
                                   wait_for_seconds: 10)
  query_params = { waitForSeconds: wait_for_seconds }
  query_params[:requestId] = request_id if request_id
  query_params[:waitUntilTaskRef] = wait_until_task_ref if wait_until_task_ref

  @api_client.call_api(
    '/workflow/{workflowId}/state',
    'POST',
    path_params: { workflowId: workflow_id },
    query_params: query_params,
    body: body,
    return_type: 'WorkflowRun',
    return_http_data_only: true
  )
end

#update_workflow_state(workflow_id, variables) ⇒ Workflow

Update workflow variables

Parameters:

  • workflow_id (String)

    Workflow ID

  • variables (Hash)

    Variables to update

Returns:



330
331
332
333
334
335
336
337
338
339
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 330

def update_workflow_state(workflow_id, variables)
  @api_client.call_api(
    '/workflow/{workflowId}/variables',
    'POST',
    path_params: { workflowId: workflow_id },
    body: variables,
    return_type: 'Workflow',
    return_http_data_only: true
  )
end

#upgrade_running_workflow(workflow_id, body) ⇒ void

This method returns an undefined value.

Upgrade a running workflow to a new version

Parameters:

  • workflow_id (String)

    Workflow ID

  • body (Hash)

    Upgrade request



439
440
441
442
443
444
445
446
447
# File 'lib/conductor/http/api/workflow_resource_api.rb', line 439

def upgrade_running_workflow(workflow_id, body)
  @api_client.call_api(
    '/workflow/{workflowId}/upgrade',
    'POST',
    path_params: { workflowId: workflow_id },
    body: body,
    return_http_data_only: true
  )
end