Class: Hatchet::Features::Runs

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/features/runs.rb

Overview

Runs client for interacting with Hatchet workflow run management API

This class provides a high-level interface for creating and managing workflow runs in the Hatchet system. It wraps the generated REST API client with a more convenient Ruby interface.

Examples:

Creating a workflow run

runs = Features::Runs.new(rest_client, config)
response = runs.create(
  name: "my-workflow",
  input: { key: "value" },
  additional_metadata: { source: "api" }
)

Since:

  • 0.1.0

Constant Summary collapse

TriggerWorkflowRunRequest =

Re-export commonly used workflow run classes for convenience

Since:

  • 0.1.0

::HatchetSdkRest::V1TriggerWorkflowRunRequest
WorkflowRunDetails =

Since:

  • 0.1.0

::HatchetSdkRest::V1WorkflowRunDetails
TaskSummary =

Since:

  • 0.1.0

::HatchetSdkRest::V1TaskSummary
TaskSummaryList =

Since:

  • 0.1.0

::HatchetSdkRest::V1TaskSummaryList
TaskStatus =

Since:

  • 0.1.0

::HatchetSdkRest::V1TaskStatus
DEFAULT_SINCE_DAYS =

Since:

  • 0.1.0

1
LARGE_DATE_RANGE_WARNING_DAYS =

Since:

  • 0.1.0

7

Instance Method Summary collapse

Constructor Details

#initialize(rest_client, config, client: nil) ⇒ void

Initializes a new Runs client instance

Parameters:

  • rest_client (Object)

    The configured REST client for API communication

  • config (Hatchet::Config)

    The Hatchet configuration containing tenant_id and other settings

  • client (Hatchet::Client, nil) (defaults to: nil)

    The parent Hatchet client (used for get_run_ref)

Since:

  • 0.1.0



101
102
103
104
105
106
107
# File 'lib/hatchet/features/runs.rb', line 101

def initialize(rest_client, config, client: nil)
  @rest_client = rest_client
  @config = config
  @client = client
  @workflow_runs_api = HatchetSdkRest::WorkflowRunsApi.new(rest_client)
  @task_api = HatchetSdkRest::TaskApi.new(rest_client)
end

Instance Method Details

#bulk_cancel(opts = nil, ids: nil, filters: nil) ⇒ void

This method returns an undefined value.

Cancel task or workflow runs in bulk, according to a set of filters

Parameters:

  • opts (BulkCancelReplayOpts, nil) (defaults to: nil)

    Options for bulk cancel, including filters and IDs

  • ids (Array<String>, nil) (defaults to: nil)

    List of run IDs to cancel

  • filters (Hash, nil) (defaults to: nil)

    Filter hash with :workflow_ids, :additional_metadata, :since, :until_time, :statuses

Raises:

Since:

  • 0.1.0



349
350
351
352
353
354
355
# File 'lib/hatchet/features/runs.rb', line 349

def bulk_cancel(opts = nil, ids: nil, filters: nil)
  opts ||= build_bulk_opts(ids: ids, filters: filters)
  @task_api.v1_task_cancel(
    @config.tenant_id,
    opts.to_cancel_request,
  )
end

#bulk_cancel_by_filters_with_pagination(sleep_time: 3, chunk_size: 500, since: nil, until_time: nil, statuses: nil, additional_metadata: nil, workflow_ids: nil) ⇒ void

This method returns an undefined value.

Cancel runs matching the specified filters in chunks

This method provides an easy way to perform bulk cancel operations by filters over a larger number of runs than the API would normally handle, with automatic pagination and chunking to limit pressure on the API.

Parameters:

  • sleep_time (Integer) (defaults to: 3)

    The time to sleep between processing chunks, in seconds (default: 3)

  • chunk_size (Integer) (defaults to: 500)

    The maximum number of run IDs to process in each chunk (default: 500)

  • since (Time, nil) (defaults to: nil)

    The start time for filtering runs

  • until_time (Time, nil) (defaults to: nil)

    The end time for filtering runs

  • statuses (Array, nil) (defaults to: nil)

    The statuses to filter runs by (default: RUNNING, QUEUED)

  • additional_metadata (Hash, nil) (defaults to: nil)

    Additional metadata to filter runs by

  • workflow_ids (Array<String>, nil) (defaults to: nil)

    The workflow IDs to filter runs by

Raises:

Since:

  • 0.1.0



411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/hatchet/features/runs.rb', line 411

def bulk_cancel_by_filters_with_pagination(sleep_time: 3, chunk_size: 500, since: nil, until_time: nil,
                                           statuses: nil, additional_metadata: nil, workflow_ids: nil)
  perform_action_with_pagination(
    action: :cancel,
    statuses: statuses || %w[RUNNING QUEUED],
    sleep_time: sleep_time,
    chunk_size: chunk_size,
    since: since,
    until_time: until_time,
    additional_metadata: ,
    workflow_ids: workflow_ids,
  )
end

#bulk_replay(opts = nil, ids: nil, filters: nil) ⇒ void

This method returns an undefined value.

Replay task or workflow runs in bulk, according to a set of filters

Parameters:

  • opts (BulkCancelReplayOpts, nil) (defaults to: nil)

    Options for bulk replay, including filters and IDs

  • ids (Array<String>, nil) (defaults to: nil)

    List of run IDs to replay

  • filters (Hash, nil) (defaults to: nil)

    Filter hash with :workflow_ids, :additional_metadata, :since, :until_time, :statuses

Raises:

Since:

  • 0.1.0



325
326
327
328
329
330
331
# File 'lib/hatchet/features/runs.rb', line 325

def bulk_replay(opts = nil, ids: nil, filters: nil)
  opts ||= build_bulk_opts(ids: ids, filters: filters)
  @task_api.v1_task_replay(
    @config.tenant_id,
    opts.to_replay_request,
  )
end

#bulk_replay_by_filters_with_pagination(sleep_time: 3, chunk_size: 500, since: nil, until_time: nil, statuses: nil, additional_metadata: nil, workflow_ids: nil) ⇒ void

This method returns an undefined value.

Replay runs matching the specified filters in chunks

This method provides an easy way to perform bulk replay operations by filters over a larger number of runs than the API would normally handle, with automatic pagination and chunking to limit pressure on the API.

Parameters:

  • sleep_time (Integer) (defaults to: 3)

    The time to sleep between processing chunks, in seconds (default: 3)

  • chunk_size (Integer) (defaults to: 500)

    The maximum number of run IDs to process in each chunk (default: 500)

  • since (Time, nil) (defaults to: nil)

    The start time for filtering runs

  • until_time (Time, nil) (defaults to: nil)

    The end time for filtering runs

  • statuses (Array, nil) (defaults to: nil)

    The statuses to filter runs by (default: FAILED, CANCELLED)

  • additional_metadata (Hash, nil) (defaults to: nil)

    Additional metadata to filter runs by

  • workflow_ids (Array<String>, nil) (defaults to: nil)

    The workflow IDs to filter runs by

Raises:

Since:

  • 0.1.0



382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/hatchet/features/runs.rb', line 382

def bulk_replay_by_filters_with_pagination(sleep_time: 3, chunk_size: 500, since: nil, until_time: nil,
                                           statuses: nil, additional_metadata: nil, workflow_ids: nil)
  perform_action_with_pagination(
    action: :replay,
    statuses: statuses || %w[FAILED CANCELLED],
    sleep_time: sleep_time,
    chunk_size: chunk_size,
    since: since,
    until_time: until_time,
    additional_metadata: ,
    workflow_ids: workflow_ids,
  )
end

#cancel(run_id) ⇒ void

This method returns an undefined value.

Cancel a task or workflow run

Parameters:

  • run_id (String)

    The external ID of the task or workflow run to cancel

Raises:

Since:

  • 0.1.0



338
339
340
# File 'lib/hatchet/features/runs.rb', line 338

def cancel(run_id)
  bulk_cancel(BulkCancelReplayOpts.new(ids: [run_id]))
end

#create(name:, input:, additional_metadata: nil, priority: nil) ⇒ HatchetSdkRest::V1WorkflowRunDetails

Creates a new workflow run in the Hatchet system

This method triggers a new workflow or task run for the specified workflow using the provided input data. The workflow run will be queued according to the workflow definition on an available worker.

IMPORTANT: It’s preferable to use ‘Workflow.run` (and similar) to trigger workflows if possible. This method is intended to be an escape hatch.

Examples:

Creating a workflow run

response = runs.create(
  name: "simple-workflow",
  input: { user_id: 123, action: "process_data" },
  additional_metadata: { source: "api", priority: "high" }
)

Parameters:

  • name (String)

    The name of the workflow to trigger

  • input (Hash)

    The input data for the workflow run

  • additional_metadata (Hash, nil) (defaults to: nil)

    Additional metadata associated with the workflow run

  • priority (Integer, nil) (defaults to: nil)

    The priority of the workflow run

Returns:

Raises:

  • (ArgumentError)

    If the workflow_name or input parameters are nil or invalid

  • (Hatchet::Error)

    If the API request fails or returns an error

Since:

  • 0.1.0



297
298
299
300
301
302
303
304
305
306
307
# File 'lib/hatchet/features/runs.rb', line 297

def create(name:, input:, additional_metadata: nil, priority: nil)
  trigger_request = HatchetSdkRest::V1TriggerWorkflowRunRequest.new(
    workflow_name: @config.apply_namespace(name),
    input: input,
    additional_metadata: ,
    priority: priority,
  )

  run = @workflow_runs_api.v1_workflow_run_create(@config.tenant_id, trigger_request)
  run.run
end

#get(workflow_run_id) ⇒ HatchetSdkRest::V1WorkflowRun

Get a workflow run by its ID

Returns the unwrapped V1WorkflowRun object directly (with status, output, etc.) Use #get_details if you need the full details wrapper (task_events, shape, tasks, etc.)

Parameters:

  • workflow_run_id (String)

    The ID of the workflow run to retrieve

Returns:

Raises:

Since:

  • 0.1.0



126
127
128
129
# File 'lib/hatchet/features/runs.rb', line 126

def get(workflow_run_id)
  details = @workflow_runs_api.v1_workflow_run_get(workflow_run_id.to_s)
  details.run
end

#get_details(workflow_run_id) ⇒ HatchetSdkRest::V1WorkflowRunDetails

Get full workflow run details for a given workflow run ID

Returns the full V1WorkflowRunDetails including task_events, shape, tasks, and workflow_config.

Parameters:

  • workflow_run_id (String)

    The ID of the workflow run to retrieve details for

Returns:

Raises:

Since:

  • 0.1.0



138
139
140
# File 'lib/hatchet/features/runs.rb', line 138

def get_details(workflow_run_id)
  @workflow_runs_api.v1_workflow_run_get(workflow_run_id.to_s)
end

#get_result(run_id) ⇒ Hash

Get the result of a workflow run by its external ID

Parameters:

  • run_id (String)

    The external ID of the workflow run to retrieve the result for

Returns:

  • (Hash)

    The result of the workflow run

Raises:

Since:

  • 0.1.0



362
363
364
365
# File 'lib/hatchet/features/runs.rb', line 362

def get_result(run_id)
  run = get(run_id)
  run.output
end

#get_run_ref(workflow_run_id) ⇒ Hatchet::WorkflowRunRef

Get a reference to a workflow run

Parameters:

  • workflow_run_id (String)

    The ID of the workflow run to get a reference to

Returns:

Since:

  • 0.1.0



429
430
431
432
433
434
435
# File 'lib/hatchet/features/runs.rb', line 429

def get_run_ref(workflow_run_id)
  Hatchet::WorkflowRunRef.new(
    workflow_run_id: workflow_run_id,
    client: @client,
    listener: @client&.workflow_run_listener,
  )
end

#get_status(workflow_run_id) ⇒ HatchetSdkRest::V1TaskStatus

Get workflow run status for a given workflow run ID

Parameters:

  • workflow_run_id (String)

    The ID of the workflow run to retrieve status for

Returns:

Raises:

Since:

  • 0.1.0



147
148
149
# File 'lib/hatchet/features/runs.rb', line 147

def get_status(workflow_run_id)
  @workflow_runs_api.v1_workflow_run_get_status(workflow_run_id)
end

#get_task_run(task_run_id) ⇒ HatchetSdkRest::V1TaskSummary

Get task run details for a given task run ID

Parameters:

  • task_run_id (String)

    The ID of the task run to retrieve details for

Returns:

Raises:

Since:

  • 0.1.0



114
115
116
# File 'lib/hatchet/features/runs.rb', line 114

def get_task_run(task_run_id)
  @task_api.v1_task_get(task_run_id)
end

#list(since: nil, only_tasks: false, offset: nil, limit: nil, statuses: nil, until_time: nil, additional_metadata: nil, workflow_ids: nil, worker_id: nil, parent_task_external_id: nil, triggering_event_external_id: nil, include_payloads: true) ⇒ HatchetSdkRest::V1TaskSummaryList

List task runs according to a set of filters

Parameters:

  • since (Time, nil) (defaults to: nil)

    The start time for filtering task runs

  • only_tasks (Boolean) (defaults to: false)

    Whether to only list task runs

  • offset (Integer, nil) (defaults to: nil)

    The offset for pagination

  • limit (Integer, nil) (defaults to: nil)

    The maximum number of task runs to return

  • statuses (Array<HatchetSdkRest::V1TaskStatus>, nil) (defaults to: nil)

    The statuses to filter task runs by

  • until_time (Time, nil) (defaults to: nil)

    The end time for filtering task runs

  • additional_metadata (Hash<String, String>, nil) (defaults to: nil)

    Additional metadata to filter task runs by

  • workflow_ids (Array<String>, nil) (defaults to: nil)

    The workflow IDs to filter task runs by

  • worker_id (String, nil) (defaults to: nil)

    The worker ID to filter task runs by

  • parent_task_external_id (String, nil) (defaults to: nil)

    The parent task external ID to filter task runs by

  • triggering_event_external_id (String, nil) (defaults to: nil)

    The event id that triggered the task run

  • include_payloads (Boolean) (defaults to: true)

    Whether to include payloads in the response (default: true)

Returns:

Raises:

Since:

  • 0.1.0



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/hatchet/features/runs.rb', line 233

def list(
  since: nil,
  only_tasks: false,
  offset: nil,
  limit: nil,
  statuses: nil,
  until_time: nil,
  additional_metadata: nil,
  workflow_ids: nil,
  worker_id: nil,
  parent_task_external_id: nil,
  triggering_event_external_id: nil,
  include_payloads: true
)
  since ||= (Time.now - (DEFAULT_SINCE_DAYS * 24 * 60 * 60))
  until_time ||= Time.now

  if (until_time - since) / (24 * 60 * 60) >= LARGE_DATE_RANGE_WARNING_DAYS
    @config.logger.warn "Listing runs with a date range longer than #{LARGE_DATE_RANGE_WARNING_DAYS} days " \
                        "may result in performance issues. " \
                        "Consider using `list_with_pagination` instead."
  end

  @workflow_runs_api.v1_workflow_run_list(
    @config.tenant_id,
    since.utc.iso8601,
    only_tasks,
    {
      offset: offset,
      limit: limit,
      statuses: statuses,
      _until: until_time.utc.iso8601,
      additional_metadata: (),
      workflow_ids: workflow_ids,
      worker_id: worker_id,
      parent_task_external_id: parent_task_external_id,
      triggering_event_external_id: triggering_event_external_id,
      include_payloads: include_payloads,
    },
  )
end

#list_with_pagination(since: nil, only_tasks: false, offset: nil, limit: nil, statuses: nil, until_time: nil, additional_metadata: nil, workflow_ids: nil, worker_id: nil, parent_task_external_id: nil, triggering_event_external_id: nil, include_payloads: true) ⇒ Array<HatchetSdkRest::V1TaskSummary>

List task runs according to a set of filters, paginating through days

Parameters:

  • since (Time, nil) (defaults to: nil)

    The start time for filtering task runs

  • only_tasks (Boolean) (defaults to: false)

    Whether to only list task runs

  • offset (Integer, nil) (defaults to: nil)

    The offset for pagination

  • limit (Integer, nil) (defaults to: nil)

    The maximum number of task runs to return

  • statuses (Array<HatchetSdkRest::V1TaskStatus>, nil) (defaults to: nil)

    The statuses to filter task runs by

  • until_time (Time, nil) (defaults to: nil)

    The end time for filtering task runs

  • additional_metadata (Hash<String, String>, nil) (defaults to: nil)

    Additional metadata to filter task runs by

  • workflow_ids (Array<String>, nil) (defaults to: nil)

    The workflow IDs to filter task runs by

  • worker_id (String, nil) (defaults to: nil)

    The worker ID to filter task runs by

  • parent_task_external_id (String, nil) (defaults to: nil)

    The parent task external ID to filter task runs by

  • triggering_event_external_id (String, nil) (defaults to: nil)

    The event id that triggered the task run

  • include_payloads (Boolean) (defaults to: true)

    Whether to include payloads in the response (default: true)

Returns:

Raises:

Since:

  • 0.1.0



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/hatchet/features/runs.rb', line 167

def list_with_pagination(
  since: nil,
  only_tasks: false,
  offset: nil,
  limit: nil,
  statuses: nil,
  until_time: nil,
  additional_metadata: nil,
  workflow_ids: nil,
  worker_id: nil,
  parent_task_external_id: nil,
  triggering_event_external_id: nil,
  include_payloads: true
)
  date_ranges = partition_date_range(
    since: since || (Time.now - (DEFAULT_SINCE_DAYS * 24 * 60 * 60)),
    until_time: until_time || Time.now,
  )

  responses = date_ranges.map do |start_time, end_time|
    @workflow_runs_api.v1_workflow_run_list(
      @config.tenant_id,
      start_time.utc.iso8601,
      only_tasks,
      {
        offset: offset,
        limit: limit,
        statuses: statuses,
        _until: end_time.utc.iso8601,
        additional_metadata: (),
        workflow_ids: workflow_ids,
        worker_id: worker_id,
        parent_task_external_id: parent_task_external_id,
        triggering_event_external_id: triggering_event_external_id,
        include_payloads: include_payloads,
      },
    )
  end

  # Hack for uniqueness
  run_id_to_run = {}
  responses.each do |record|
    record.rows.each do |run|
      run_id_to_run[run..id] = run
    end
  end

  run_id_to_run.values.sort_by(&:created_at).reverse
end

#poll(workflow_run_id, interval: 1.0, timeout: nil) ⇒ HatchetSdkRest::V1WorkflowRunDetails

Poll for workflow run completion with configurable interval and timeout

This method repeatedly calls ‘get` until the workflow run reaches a terminal state (succeeded, failed, or cancelled) or the timeout is reached.

Examples:

Poll with default settings (1s interval, no timeout)

result = runs.poll("workflow-run-123")

Poll with custom interval and timeout

result = runs.poll("workflow-run-123", interval: 2.0, timeout: 30.0)

Parameters:

  • workflow_run_id (String)

    The ID of the workflow run to poll

  • interval (Numeric) (defaults to: 1.0)

    The polling interval in seconds (default: 1.0)

  • timeout (Numeric, nil) (defaults to: nil)

    The maximum time to poll in seconds (default: no timeout)

Returns:

Raises:

  • (Timeout::Error)

    If the timeout is reached before completion

  • (Hatchet::Error)

    If the API request fails or returns an error

Since:

  • 0.1.0



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# File 'lib/hatchet/features/runs.rb', line 453

def poll(workflow_run_id, interval: 1.0, timeout: nil)
  start_time = Time.now

  loop do
    begin
      run = get(workflow_run_id)
    rescue HatchetSdkRest::ApiError => e
      raise unless e.code == 404

      # Run may not be available via REST API yet after gRPC trigger
      raise Timeout::Error, "Polling timed out after #{timeout} seconds" if timeout && (Time.now - start_time) >= timeout

      sleep(interval)
      next
    end

    status = run.status

    return run if terminal_status?(status)

    # Check timeout
    raise Timeout::Error, "Polling timed out after #{timeout} seconds" if timeout && (Time.now - start_time) >= timeout

    sleep(interval)
  end
end

#replay(run_id) ⇒ void

This method returns an undefined value.

Replay a task or workflow run

Parameters:

  • run_id (String)

    The external ID of the task or workflow run to replay

Raises:

Since:

  • 0.1.0



314
315
316
# File 'lib/hatchet/features/runs.rb', line 314

def replay(run_id)
  bulk_replay(BulkCancelReplayOpts.new(ids: [run_id]))
end

#subscribe_to_stream(workflow_run_id) {|String| ... } ⇒ void

This method returns an undefined value.

Subscribe to stream events for a workflow run.

Opens a gRPC server-streaming subscription to ‘SubscribeToWorkflowEvents` and yields each stream chunk payload to the given block.

Parameters:

  • workflow_run_id (String)

    The workflow run ID to subscribe to

Yields:

  • (String)

    Each stream chunk payload

Raises:

Since:

  • 0.1.0



489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/hatchet/features/runs.rb', line 489

def subscribe_to_stream(workflow_run_id)
  return unless block_given?

  stub = ::Dispatcher::Stub.new(
    @config.host_port,
    nil,
    channel_override: @client.channel,
  )

  request = ::SubscribeToWorkflowEventsRequest.new(
    workflow_run_id: workflow_run_id,
  )

  response_stream = stub.subscribe_to_workflow_events(
    request,
    metadata: @config.,
  )

  response_stream.each do |event|
    # Filter for stream events (RESOURCE_EVENT_TYPE_STREAM = 6)
    yield event.event_payload if event.event_type == :RESOURCE_EVENT_TYPE_STREAM

    # Stop if we get a hangup signal
    break if event.respond_to?(:hangup) && event.hangup
  end
end