Class: Hatchet::Features::Scheduled

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/features/scheduled.rb

Overview

Scheduled client for managing scheduled workflows within Hatchet

This class provides a high-level interface for creating, deleting, updating, bulk operations, listing, and retrieving scheduled workflow runs.

Examples:

Creating a scheduled workflow

scheduled = scheduled_client.create(
  workflow_name: "my-workflow",
  trigger_at: Time.now + 3600,
  input: { key: "value" },
  additional_metadata: { source: "api" }
)

Since:

  • 0.1.0

Instance Method Summary collapse

Constructor Details

#initialize(rest_client, config) ⇒ void

Initializes a new Scheduled client instance

Parameters:

  • rest_client (Object)

    The configured REST client for API communication

  • config (Hatchet::Config)

    The Hatchet configuration containing tenant_id and other settings

Since:

  • 0.1.0



28
29
30
31
32
33
# File 'lib/hatchet/features/scheduled.rb', line 28

def initialize(rest_client, config)
  @rest_client = rest_client
  @config = config
  @workflow_api = HatchetSdkRest::WorkflowApi.new(rest_client)
  @workflow_run_api = HatchetSdkRest::WorkflowRunApi.new(rest_client)
end

Instance Method Details

#bulk_delete(scheduled_ids: nil, workflow_id: nil, parent_workflow_run_id: nil, parent_step_run_id: nil, statuses: nil, additional_metadata: nil) ⇒ Object

Bulk delete scheduled workflow runs

Provide either scheduled_ids (explicit list) or one or more filter fields.

Parameters:

  • scheduled_ids (Array<String>, nil) (defaults to: nil)

    Explicit list of scheduled workflow run IDs to delete

  • workflow_id (String, nil) (defaults to: nil)

    Filter by workflow ID

  • parent_workflow_run_id (String, nil) (defaults to: nil)

    Filter by parent workflow run ID

  • parent_step_run_id (String, nil) (defaults to: nil)

    Filter by parent step run ID

  • statuses (Array<String>, nil) (defaults to: nil)

    Filter by scheduled run statuses (warning: may not be supported)

  • additional_metadata (Hash, nil) (defaults to: nil)

    Filter by additional metadata key/value pairs

Returns:

  • (Object)

    The bulk delete response containing deleted IDs and per-item errors

Raises:

  • (ArgumentError)

    If neither scheduled_ids nor any filter field is provided

  • (HatchetSdkRest::ApiError)

    If the API request fails

Since:

  • 0.1.0



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/hatchet/features/scheduled.rb', line 110

def bulk_delete(scheduled_ids: nil, workflow_id: nil, parent_workflow_run_id: nil,
                parent_step_run_id: nil, statuses: nil, additional_metadata: nil)
  warn "The 'statuses' filter is not supported for bulk delete and will be ignored." if statuses

  has_filter = [workflow_id, parent_workflow_run_id, parent_step_run_id, ].any? { |v| !v.nil? }

  raise ArgumentError, "bulk_delete requires either scheduled_ids or at least one filter field." unless scheduled_ids || has_filter

  filter_obj = nil
  if has_filter
    filter_obj = HatchetSdkRest::ScheduledWorkflowsBulkDeleteFilter.new(
      workflow_id: workflow_id,
      parent_workflow_run_id: parent_workflow_run_id,
      parent_step_run_id: parent_step_run_id,
      additional_metadata: (),
    )
  end

  request = HatchetSdkRest::ScheduledWorkflowsBulkDeleteRequest.new(
    scheduled_workflow_run_ids: scheduled_ids,
    filter: filter_obj,
  )

  @workflow_api.workflow_scheduled_bulk_delete(@config.tenant_id, request)
end

#bulk_update(updates) ⇒ Object

Bulk reschedule scheduled workflow runs

Examples:

scheduled_client.bulk_update([
  { id: "scheduled-1", trigger_at: Time.now + 3600 },
  { id: "scheduled-2", trigger_at: Time.now + 7200 }
])

Parameters:

  • updates (Array<Hash>)

    Array of hashes with :id and :trigger_at keys

Returns:

  • (Object)

    The bulk update response containing updated IDs and per-item errors

Raises:

Since:

  • 0.1.0



146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/hatchet/features/scheduled.rb', line 146

def bulk_update(updates)
  update_items = updates.map do |u|
    HatchetSdkRest::ScheduledWorkflowsBulkUpdateItem.new(
      id: u[:id],
      trigger_at: u[:trigger_at].utc.iso8601,
    )
  end

  request = HatchetSdkRest::ScheduledWorkflowsBulkUpdateRequest.new(
    updates: update_items,
  )

  @workflow_api.workflow_scheduled_bulk_update(@config.tenant_id, request)
end

#create(workflow_name:, trigger_at:, input: {}, additional_metadata: {}) ⇒ Object

Create a new scheduled workflow run

IMPORTANT: It’s preferable to use Workflow.run to trigger workflows if possible. This method is intended to be an escape hatch.

Examples:

scheduled = scheduled_client.create(
  workflow_name: "my-workflow",
  trigger_at: Time.now + 3600,
  input: { key: "value" },
  additional_metadata: { source: "api" }
)

Parameters:

  • workflow_name (String)

    The name of the workflow to schedule (namespace will be applied)

  • trigger_at (Time)

    The datetime when the run should be triggered

  • input (Hash) (defaults to: {})

    The input data for the scheduled workflow

  • additional_metadata (Hash) (defaults to: {})

    Additional metadata associated with the future run

Returns:

  • (Object)

    The created scheduled workflow instance

Raises:

Since:

  • 0.1.0



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/hatchet/features/scheduled.rb', line 53

def create(workflow_name:, trigger_at:, input: {}, additional_metadata: {})
  request = HatchetSdkRest::ScheduleWorkflowRunRequest.new(
    trigger_at: trigger_at.utc.iso8601,
    input: input,
    additional_metadata: ,
  )

  @workflow_run_api.scheduled_workflow_run_create(
    @config.tenant_id,
    @config.apply_namespace(workflow_name),
    request,
  )
end

#delete(scheduled_id) ⇒ void

This method returns an undefined value.

Delete a scheduled workflow run by its ID

Examples:

scheduled_client.delete("scheduled-123")

Parameters:

  • scheduled_id (String)

    The ID of the scheduled workflow run to delete

Raises:

Since:

  • 0.1.0



74
75
76
# File 'lib/hatchet/features/scheduled.rb', line 74

def delete(scheduled_id)
  @workflow_api.workflow_scheduled_delete(@config.tenant_id, scheduled_id)
end

#get(scheduled_id) ⇒ Object

Retrieve a specific scheduled workflow by ID

Examples:

scheduled = scheduled_client.get("scheduled-123")

Parameters:

  • scheduled_id (String)

    The scheduled workflow trigger ID to retrieve

Returns:

  • (Object)

    The requested scheduled workflow instance

Raises:

Since:

  • 0.1.0



199
200
201
# File 'lib/hatchet/features/scheduled.rb', line 199

def get(scheduled_id)
  @workflow_api.workflow_scheduled_get(@config.tenant_id, scheduled_id)
end

#list(offset: nil, limit: nil, workflow_id: nil, parent_workflow_run_id: nil, statuses: nil, additional_metadata: nil, order_by_field: nil, order_by_direction: nil) ⇒ Object

List scheduled workflows based on provided filters

Examples:

scheduled = scheduled_client.list(limit: 10, workflow_id: "wf-1")

Parameters:

  • offset (Integer, nil) (defaults to: nil)

    The offset to use in pagination

  • limit (Integer, nil) (defaults to: nil)

    The maximum number of scheduled workflows to return

  • workflow_id (String, nil) (defaults to: nil)

    The ID of the workflow to filter by

  • parent_workflow_run_id (String, nil) (defaults to: nil)

    The ID of the parent workflow run to filter by

  • statuses (Array<String>, nil) (defaults to: nil)

    A list of statuses to filter by

  • additional_metadata (Hash, nil) (defaults to: nil)

    Additional metadata to filter by

  • order_by_field (String, nil) (defaults to: nil)

    The field to order the results by

  • order_by_direction (String, nil) (defaults to: nil)

    The direction to order the results by

Returns:

  • (Object)

    A list of scheduled workflows matching the provided filters

Raises:

Since:

  • 0.1.0



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/hatchet/features/scheduled.rb', line 175

def list(offset: nil, limit: nil, workflow_id: nil, parent_workflow_run_id: nil,
         statuses: nil, additional_metadata: nil, order_by_field: nil, order_by_direction: nil)
  @workflow_api.workflow_scheduled_list(
    @config.tenant_id,
    {
      offset: offset,
      limit: limit,
      order_by_field: order_by_field,
      order_by_direction: order_by_direction,
      workflow_id: workflow_id,
      additional_metadata: (),
      parent_workflow_run_id: parent_workflow_run_id,
      statuses: statuses,
    },
  )
end

#update(scheduled_id, trigger_at:) ⇒ Object

Reschedule a scheduled workflow run by its ID

Note: the server may reject rescheduling if the scheduled run has already triggered, or if it was created via code definition (not via API).

Examples:

scheduled_client.update("scheduled-123", trigger_at: Time.now + 7200)

Parameters:

  • scheduled_id (String)

    The ID of the scheduled workflow run to reschedule

  • trigger_at (Time)

    The new datetime when the run should be triggered

Returns:

  • (Object)

    The updated scheduled workflow instance

Raises:

Since:

  • 0.1.0



89
90
91
92
93
94
95
# File 'lib/hatchet/features/scheduled.rb', line 89

def update(scheduled_id, trigger_at:)
  request = HatchetSdkRest::UpdateScheduledWorkflowRunRequest.new(
    trigger_at: trigger_at.utc.iso8601,
  )

  @workflow_api.workflow_scheduled_update(@config.tenant_id, scheduled_id, request)
end