Class: Conductor::Client::TaskClient

Inherits:
Object
  • Object
show all
Defined in:
lib/conductor/client/task_client.rb

Overview

TaskClient - High-level client for task operations

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = nil) ⇒ TaskClient

Initialize TaskClient

Parameters:

  • configuration (Configuration) (defaults to: nil)

    Optional configuration



15
16
17
18
19
# File 'lib/conductor/client/task_client.rb', line 15

def initialize(configuration = nil)
  @configuration = configuration || Configuration.new
  api_client = Http::ApiClient.new(configuration: @configuration)
  @task_api = Http::Api::TaskResourceApi.new(api_client)
end

Instance Attribute Details

#task_apiObject (readonly)

Returns the value of attribute task_api.



11
12
13
# File 'lib/conductor/client/task_client.rb', line 11

def task_api
  @task_api
end

Instance Method Details

#add_task_log(task_id, log_message) ⇒ void

This method returns an undefined value.

Add task execution log

Parameters:

  • task_id (String)

    Task ID

  • log_message (String)

    Log message



87
88
89
# File 'lib/conductor/client/task_client.rb', line 87

def add_task_log(task_id, log_message)
  @task_api.log(task_id, log_message)
end

#batch_poll_tasks(task_type, count: 1, timeout: 100, worker_id: nil, domain: nil) ⇒ Array<Task>

Batch poll for tasks

Parameters:

  • task_type (String)

    Task type to poll

  • count (Integer) (defaults to: 1)

    Number of tasks to poll (default: 1)

  • timeout (Integer) (defaults to: 100)

    Timeout in milliseconds (default: 100)

  • worker_id (String) (defaults to: nil)

    Worker ID (optional)

  • domain (String) (defaults to: nil)

    Domain (optional)

Returns:

  • (Array<Task>)

    Array of tasks



37
38
39
# File 'lib/conductor/client/task_client.rb', line 37

def batch_poll_tasks(task_type, count: 1, timeout: 100, worker_id: nil, domain: nil)
  @task_api.batch_poll(task_type, count: count, timeout: timeout, worker_id: worker_id, domain: domain)
end

#get_all_poll_dataArray<PollData>

Get all poll data

Returns:

  • (Array<PollData>)


138
139
140
# File 'lib/conductor/client/task_client.rb', line 138

def get_all_poll_data
  @task_api.get_all_poll_data
end

#get_all_queue_detailsHash<String, Integer>

Get all queue details

Returns:

  • (Hash<String, Integer>)

    Map of task type to queue size



72
73
74
# File 'lib/conductor/client/task_client.rb', line 72

def get_all_queue_details
  @task_api.all_queue_details
end

#get_pending_tasks(task_type, start: 0, count: 100) ⇒ Array<Task>

Get pending tasks for a task type

Parameters:

  • task_type (String)

    Task type

  • start (Integer) (defaults to: 0)

    Start index (default: 0)

  • count (Integer) (defaults to: 100)

    Number of tasks (default: 100)

Returns:

  • (Array<Task>)

    Array of pending tasks



103
104
105
# File 'lib/conductor/client/task_client.rb', line 103

def get_pending_tasks(task_type, start: 0, count: 100)
  @task_api.get_pending_task_for_task_type(task_type, start: start, count: count)
end

#get_queue_details(task_type) ⇒ Hash

Get queue details for a specific task type

Parameters:

  • task_type (String)

    Task type

Returns:

  • (Hash)

    Queue details



79
80
81
# File 'lib/conductor/client/task_client.rb', line 79

def get_queue_details(task_type)
  @task_api.get_task_queue_details(task_type)
end

#get_queue_sizes(task_types: nil) ⇒ Hash<String, Integer>

Get queue sizes for task types

Parameters:

  • task_types (Array<String>) (defaults to: nil)

    List of task types (optional)

Returns:

  • (Hash<String, Integer>)

    Map of task type to queue size



66
67
68
# File 'lib/conductor/client/task_client.rb', line 66

def get_queue_sizes(task_types: nil)
  @task_api.size(task_types: task_types)
end

#get_queue_sizes_for_tasks(task_types) ⇒ Hash<String, Integer>

Get queue sizes for specific task types

Parameters:

  • task_types (Array<String>)

    List of task type names

Returns:

  • (Hash<String, Integer>)


163
164
165
# File 'lib/conductor/client/task_client.rb', line 163

def get_queue_sizes_for_tasks(task_types)
  @task_api.get_queue_sizes_for_tasks(task_types)
end

#get_task(task_id) ⇒ Task

Get task details

Parameters:

  • task_id (String)

    Task ID

Returns:

  • (Task)

    Task object



51
52
53
# File 'lib/conductor/client/task_client.rb', line 51

def get_task(task_id)
  @task_api.get_task(task_id)
end

#get_task_logs(task_id) ⇒ Array<TaskExecLog>

Get task execution logs

Parameters:

  • task_id (String)

    Task ID

Returns:

  • (Array<TaskExecLog>)

    Array of task execution logs



94
95
96
# File 'lib/conductor/client/task_client.rb', line 94

def get_task_logs(task_id)
  @task_api.get_task_logs(task_id)
end

#get_task_poll_data(task_type) ⇒ Array<PollData>

Get poll data for a task type

Parameters:

  • task_type (String)

    Task type name

Returns:

  • (Array<PollData>)


132
133
134
# File 'lib/conductor/client/task_client.rb', line 132

def get_task_poll_data(task_type)
  @task_api.get_poll_data(task_type)
end

#poll_task(task_type, worker_id: nil, domain: nil) ⇒ Task?

Poll for a task

Parameters:

  • task_type (String)

    Task type to poll

  • worker_id (String) (defaults to: nil)

    Worker ID (optional)

  • domain (String) (defaults to: nil)

    Domain (optional)

Returns:

  • (Task, nil)

    Task object or nil if no task available



26
27
28
# File 'lib/conductor/client/task_client.rb', line 26

def poll_task(task_type, worker_id: nil, domain: nil)
  @task_api.poll(task_type, worker_id: worker_id, domain: domain)
end

#remove_task_from_queue(task_type, task_id) ⇒ void

This method returns an undefined value.

Remove task from queue

Parameters:

  • task_type (String)

    Task type

  • task_id (String)

    Task ID



59
60
61
# File 'lib/conductor/client/task_client.rb', line 59

def remove_task_from_queue(task_type, task_id)
  @task_api.remove_task_from_queue(task_type, task_id)
end

#requeue_pending_task(task_type) ⇒ String

Requeue pending tasks

Parameters:

  • task_type (String)

    Task type name

Returns:

  • (String)


145
146
147
# File 'lib/conductor/client/task_client.rb', line 145

def requeue_pending_task(task_type)
  @task_api.requeue_pending_task(task_type)
end

#search(start: 0, size: 100, sort: nil, free_text: '*', query: nil) ⇒ SearchResult

Search for tasks

Parameters:

  • start (Integer) (defaults to: 0)

    Start index (default: 0)

  • size (Integer) (defaults to: 100)

    Page size (default: 100)

  • sort (String) (defaults to: nil)

    Sort order (optional)

  • free_text (String) (defaults to: '*')

    Free text search (default: '*')

  • query (String) (defaults to: nil)

    Query string (optional)

Returns:

  • (SearchResult)


156
157
158
# File 'lib/conductor/client/task_client.rb', line 156

def search(start: 0, size: 100, sort: nil, free_text: '*', query: nil)
  @task_api.search(start: start, size: size, sort: sort, free_text: free_text, query: query)
end

#update_task(task_result) ⇒ String

Update task status

Parameters:

  • task_result (TaskResult)

    Task result

Returns:

  • (String)

    Task ID



44
45
46
# File 'lib/conductor/client/task_client.rb', line 44

def update_task(task_result)
  @task_api.update_task(task_result)
end

#update_task_by_ref_name(workflow_id, task_ref_name, status, output: nil, worker_id: nil) ⇒ String

Update task by reference name

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_ref_name (String)

    Task reference name

  • status (String)

    New status

  • output (Hash) (defaults to: nil)

    Task output data (optional)

  • worker_id (String) (defaults to: nil)

    Worker ID (optional)

Returns:

  • (String)

    Updated workflow ID



114
115
116
# File 'lib/conductor/client/task_client.rb', line 114

def update_task_by_ref_name(workflow_id, task_ref_name, status, output: nil, worker_id: nil)
  @task_api.update_task_by_ref_name(workflow_id, task_ref_name, status, output: output, worker_id: worker_id)
end

#update_task_sync(workflow_id, task_ref_name, status, output: nil, worker_id: nil) ⇒ Workflow

Update task by reference name synchronously (returns workflow state)

Parameters:

  • workflow_id (String)

    Workflow ID

  • task_ref_name (String)

    Task reference name

  • status (String)

    New status

  • output (Hash) (defaults to: nil)

    Task output data (optional)

  • worker_id (String) (defaults to: nil)

    Worker ID (optional)

Returns:



125
126
127
# File 'lib/conductor/client/task_client.rb', line 125

def update_task_sync(workflow_id, task_ref_name, status, output: nil, worker_id: nil)
  @task_api.update_task_sync(workflow_id, task_ref_name, status, output: output, worker_id: worker_id)
end