Class: Hatchet::Workflow

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/workflow.rb

Overview

Represents a workflow definition with one or more tasks arranged in a DAG.

Examples:

Define a simple workflow

wf = hatchet.workflow(name: "MyWorkflow")
step1 = wf.task(:step1) { |input, ctx| { "value" => 42 } }
wf.task(:step2, parents: [step1]) { |input, ctx|
  { "result" => ctx.task_output(step1)["value"] + 1 }
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, on_events: [], on_crons: [], concurrency: nil, default_priority: nil, task_defaults: nil, default_filters: [], sticky: nil, client: nil) ⇒ Workflow

Returns a new instance of Workflow.

Parameters:

  • name (String)

    Workflow name

  • on_events (Array<String>) (defaults to: [])

    Event trigger keys

  • on_crons (Array<String>) (defaults to: [])

    Cron trigger expressions

  • concurrency (Array<ConcurrencyExpression>, ConcurrencyExpression, nil) (defaults to: nil)
  • default_priority (Integer, nil) (defaults to: nil)

    Default priority

  • task_defaults (Hash, nil) (defaults to: nil)

    Default task settings

  • default_filters (Array<DefaultFilter>) (defaults to: [])

    Default filters

  • sticky (Symbol, nil) (defaults to: nil)

    Sticky strategy

  • client (Hatchet::Client, nil) (defaults to: nil)

    The client



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/hatchet/workflow.rb', line 61

def initialize(
  name:,
  on_events: [],
  on_crons: [],
  concurrency: nil,
  default_priority: nil,
  task_defaults: nil,
  default_filters: [],
  sticky: nil,
  client: nil
)
  @name = name
  @tasks = {}
  @on_events = on_events
  @on_crons = on_crons
  @concurrency = concurrency
  @default_priority = default_priority
  @task_defaults = task_defaults
  @default_filters = default_filters
  @sticky = sticky
  @client = client
  @on_failure = nil
  @on_success = nil
  @id = nil
end

Instance Attribute Details

#clientHatchet::Client? (readonly)

Returns The Hatchet client.

Returns:



41
42
43
# File 'lib/hatchet/workflow.rb', line 41

def client
  @client
end

#concurrencyArray<ConcurrencyExpression>, ... (readonly)

Returns Workflow-level concurrency.

Returns:



26
27
28
# File 'lib/hatchet/workflow.rb', line 26

def concurrency
  @concurrency
end

#default_filtersArray<DefaultFilter> (readonly)

Returns Default filters for event triggers.

Returns:



35
36
37
# File 'lib/hatchet/workflow.rb', line 35

def default_filters
  @default_filters
end

#default_priorityInteger? (readonly)

Returns Default priority for runs (1-4).

Returns:

  • (Integer, nil)

    Default priority for runs (1-4)



29
30
31
# File 'lib/hatchet/workflow.rb', line 29

def default_priority
  @default_priority
end

#idString?

Get the workflow ID (UUID). If not already set, lazily resolves it by looking up the workflow by name via the REST API.

Returns:

  • (String, nil)

    The workflow UUID



91
92
93
# File 'lib/hatchet/workflow.rb', line 91

def id
  @id ||= resolve_workflow_id
end

#nameString (readonly)

Returns Workflow name.

Returns:

  • (String)

    Workflow name



14
15
16
# File 'lib/hatchet/workflow.rb', line 14

def name
  @name
end

#on_cronsArray<String> (readonly)

Returns Cron expressions that trigger this workflow.

Returns:

  • (Array<String>)

    Cron expressions that trigger this workflow



23
24
25
# File 'lib/hatchet/workflow.rb', line 23

def on_crons
  @on_crons
end

#on_eventsArray<String> (readonly)

Returns Event keys that trigger this workflow.

Returns:

  • (Array<String>)

    Event keys that trigger this workflow



20
21
22
# File 'lib/hatchet/workflow.rb', line 20

def on_events
  @on_events
end

#on_failureTask? (readonly)

Returns The on_failure task.

Returns:

  • (Task, nil)

    The on_failure task



44
45
46
# File 'lib/hatchet/workflow.rb', line 44

def on_failure
  @on_failure
end

#on_successTask? (readonly)

Returns The on_success task.

Returns:

  • (Task, nil)

    The on_success task



47
48
49
# File 'lib/hatchet/workflow.rb', line 47

def on_success
  @on_success
end

#stickySymbol? (readonly)

Returns Sticky strategy (:soft, :hard).

Returns:

  • (Symbol, nil)

    Sticky strategy (:soft, :hard)



38
39
40
# File 'lib/hatchet/workflow.rb', line 38

def sticky
  @sticky
end

#task_defaultsHash? (readonly)

Returns Default task settings.

Returns:

  • (Hash, nil)

    Default task settings



32
33
34
# File 'lib/hatchet/workflow.rb', line 32

def task_defaults
  @task_defaults
end

#tasksHash<Symbol, Task> (readonly)

Returns Map of task name to Task object.

Returns:

  • (Hash<Symbol, Task>)

    Map of task name to Task object



17
18
19
# File 'lib/hatchet/workflow.rb', line 17

def tasks
  @tasks
end

Instance Method Details

#create_bulk_run_item(input: {}, key: nil, options: nil) ⇒ Hash

Create a bulk run item for use with run_many

Parameters:

  • input (Hash) (defaults to: {})

    Input data

  • key (String, nil) (defaults to: nil)

    Deduplication key

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:

  • (Hash)

    Bulk run item



263
264
265
266
267
268
# File 'lib/hatchet/workflow.rb', line 263

def create_bulk_run_item(input: {}, key: nil, options: nil)
  item = { input: input }
  item[:key] = key if key
  item[:options] = options if options
  item
end

#create_cron(cron_name, expression, input: {}) ⇒ Object

Create a cron trigger for this workflow

Parameters:

  • cron_name (String)

    Name for the cron

  • expression (String)

    Cron expression

  • input (Hash) (defaults to: {})

    Workflow input

Returns:

  • (Object)

    Cron result

Raises:



288
289
290
291
292
293
294
295
296
297
# File 'lib/hatchet/workflow.rb', line 288

def create_cron(cron_name, expression, input: {})
  raise Error, "No client associated with workflow #{@name}" unless @client

  @client.cron.create(
    workflow_name: @name,
    cron_name: cron_name,
    expression: expression,
    input: input,
  )
end

#durable_task(name, eviction_policy: Hatchet::DEFAULT_DURABLE_TASK_EVICTION_POLICY, **opts) {|input, ctx| ... } ⇒ Task

Define a durable task within this workflow.

Parameters:

  • name (Symbol, String)

    Task name

  • eviction_policy (Hatchet::EvictionPolicy, nil) (defaults to: Hatchet::DEFAULT_DURABLE_TASK_EVICTION_POLICY)

    Eviction policy for this durable task. Defaults to DEFAULT_DURABLE_TASK_EVICTION_POLICY (15-minute TTL, capacity-eviction enabled). Pass “nil“ to disable eviction entirely for this task.

  • opts (Hash)

    Other Task options forwarded to #task.

Yields:

  • (input, ctx)

    The task execution block

Returns:

  • (Task)

    The created durable task



123
124
125
# File 'lib/hatchet/workflow.rb', line 123

def durable_task(name, eviction_policy: Hatchet::DEFAULT_DURABLE_TASK_EVICTION_POLICY, **opts, &)
  task(name, durable: true, eviction_policy: eviction_policy, **opts, &)
end

#on_failure_task(**opts) {|input, ctx| ... } ⇒ Task

Define an on_failure task for this workflow

Parameters:

  • opts (Hash)

    Task options

Yields:

  • (input, ctx)

    The on_failure task block

Returns:



132
133
134
135
136
137
138
139
140
# File 'lib/hatchet/workflow.rb', line 132

def on_failure_task(**opts, &)
  @on_failure = Task.new(
    name: :on_failure,
    workflow: self,
    client: @client,
    **opts,
    &
  )
end

#on_success_task(**opts) {|input, ctx| ... } ⇒ Task

Define an on_success task for this workflow

Parameters:

  • opts (Hash)

    Task options

Yields:

  • (input, ctx)

    The on_success task block

Returns:



147
148
149
150
151
152
153
154
155
# File 'lib/hatchet/workflow.rb', line 147

def on_success_task(**opts, &)
  @on_success = Task.new(
    name: :on_success,
    workflow: self,
    client: @client,
    **opts,
    &
  )
end

#run(input = {}, options: nil) ⇒ Hash

Run this workflow synchronously

Parameters:

  • input (Hash) (defaults to: {})

    Workflow input

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:

  • (Hash)

    The workflow run output

Raises:



219
220
221
222
223
# File 'lib/hatchet/workflow.rb', line 219

def run(input = {}, options: nil)
  raise Error, "No client associated with workflow #{@name}" unless @client

  @client.admin.trigger_workflow(self, input, options: options)
end

#run_many(items, return_exceptions: false) ⇒ Array

Run many instances of this workflow in bulk

Parameters:

  • items (Array<Hash>)

    Bulk run items

  • return_exceptions (Boolean) (defaults to: false)

    Return exceptions instead of raising

Returns:

  • (Array)

    Results

Raises:



241
242
243
244
245
# File 'lib/hatchet/workflow.rb', line 241

def run_many(items, return_exceptions: false)
  raise Error, "No client associated with workflow #{@name}" unless @client

  @client.admin.trigger_workflow_many(self, items, return_exceptions: return_exceptions)
end

#run_many_no_wait(items) ⇒ Array<WorkflowRunRef>

Run many instances without waiting for results

Parameters:

  • items (Array<Hash>)

    Bulk run items

Returns:

Raises:



251
252
253
254
255
# File 'lib/hatchet/workflow.rb', line 251

def run_many_no_wait(items)
  raise Error, "No client associated with workflow #{@name}" unless @client

  @client.admin.trigger_workflow_many_no_wait(self, items)
end

#run_no_wait(input = {}, options: nil) ⇒ WorkflowRunRef

Run this workflow without waiting for the result

Parameters:

  • input (Hash) (defaults to: {})

    Workflow input

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:

Raises:



230
231
232
233
234
# File 'lib/hatchet/workflow.rb', line 230

def run_no_wait(input = {}, options: nil)
  raise Error, "No client associated with workflow #{@name}" unless @client

  @client.admin.trigger_workflow_no_wait(self, input, options: options)
end

#schedule(time, input: {}, options: nil) ⇒ Object

Schedule this workflow for future execution

Parameters:

  • time (Time)

    When to execute

  • input (Hash) (defaults to: {})

    Workflow input

  • options (ScheduleTriggerWorkflowOptions, nil) (defaults to: nil)

    Schedule options

Returns:

  • (Object)

    Schedule result

Raises:



276
277
278
279
280
# File 'lib/hatchet/workflow.rb', line 276

def schedule(time, input: {}, options: nil)
  raise Error, "No client associated with workflow #{@name}" unless @client

  @client.admin.schedule_workflow(self, time, input: input, options: options)
end

#task(name, **opts) {|input, ctx| ... } ⇒ Task

Define a task within this workflow

Parameters:

  • name (Symbol, String)

    Task name

  • opts (Hash)

    Task options (parents:, execution_timeout:, retries:, etc.)

Yields:

  • (input, ctx)

    The task execution block

Returns:

  • (Task)

    The created task



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/hatchet/workflow.rb', line 101

def task(name, **opts, &)
  t = Task.new(
    name: name,
    workflow: self,
    client: @client,
    **opts,
    &
  )
  @tasks[t.name] = t
  t
end

#to_proto(config) ⇒ V1::CreateWorkflowVersionRequest

Convert this workflow to a V1::CreateWorkflowVersionRequest protobuf message.

Parameters:

Returns:



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/hatchet/workflow.rb', line 161

def to_proto(config)
  service_name = config.apply_namespace(@name.downcase)

  # Namespace event triggers
  event_triggers = @on_events.map { |e| config.apply_namespace(e) }

  # Convert tasks to proto
  task_protos = @tasks.values.map { |t| t.to_proto(service_name, config: config) }

  # On-failure task
  on_failure_proto = @on_failure&.to_proto(service_name, config: config)

  # Build concurrency
  concurrency_proto = nil
  concurrency_arr = []

  if @concurrency
    conc_list = @concurrency.is_a?(Array) ? @concurrency : [@concurrency]

    if conc_list.length == 1
      concurrency_proto = conc_list.first.to_proto
    else
      concurrency_arr = conc_list.map(&:to_proto)
    end
  end

  # Sticky strategy
  sticky_proto = nil
  if @sticky
    sticky_map = { soft: :SOFT, hard: :HARD }
    sticky_proto = sticky_map[@sticky]
  end

  # Default filters
  filter_protos = (@default_filters || []).map(&:to_proto)

  args = {
    name: config.apply_namespace(@name),
    event_triggers: event_triggers,
    cron_triggers: @on_crons || [],
    tasks: task_protos,
  }

  args[:concurrency] = concurrency_proto if concurrency_proto
  args[:concurrency_arr] = concurrency_arr unless concurrency_arr.empty?
  args[:on_failure_task] = on_failure_proto if on_failure_proto
  args[:sticky] = sticky_proto if sticky_proto
  args[:default_priority] = @default_priority if @default_priority
  args[:default_filters] = filter_protos unless filter_protos.empty?

  ::V1::CreateWorkflowVersionRequest.new(**args)
end