Class: Hatchet::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/task.rb

Overview

Represents a task within a workflow (or a standalone task).

Tasks are the basic unit of work in Hatchet. They can be defined as part of a workflow or as standalone tasks. Each task has a block that executes the task logic, receiving the workflow input and a context object.

Examples:

Task defined in a workflow

step1 = workflow.task(:step1) { |input, ctx| { "result" => "done" } }

Standalone task

task = hatchet.task(name: "my_task") { |input, ctx| { "result" => "done" } }

Constant Summary collapse

DURATION_MAP =
{
  second: :SECOND, minute: :MINUTE, hour: :HOUR,
  day: :DAY, week: :WEEK, month: :MONTH, year: :YEAR,
}.freeze
COMPARATOR_MAP =
{
  equal: :EQUAL, not_equal: :NOT_EQUAL,
  greater_than: :GREATER_THAN, greater_than_or_equal: :GREATER_THAN_OR_EQUAL,
  less_than: :LESS_THAN, less_than_or_equal: :LESS_THAN_OR_EQUAL,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, parents: [], execution_timeout: nil, schedule_timeout: nil, retries: nil, backoff_max_seconds: nil, backoff_factor: nil, rate_limits: [], concurrency: nil, desired_worker_labels: nil, wait_for: [], skip_if: [], durable: false, eviction_policy: nil, workflow: nil, client: nil, deps: nil, &block) ⇒ Task

Returns a new instance of Task.

Parameters:

  • name (Symbol, String)

    Task name

  • parents (Array<Task, Symbol>) (defaults to: [])

    Parent tasks

  • execution_timeout (Integer, nil) (defaults to: nil)

    Execution timeout in seconds

  • schedule_timeout (Integer, nil) (defaults to: nil)

    Schedule timeout in seconds

  • retries (Integer, nil) (defaults to: nil)

    Max retries

  • backoff_max_seconds (Integer, nil) (defaults to: nil)

    Max backoff seconds

  • backoff_factor (Float, nil) (defaults to: nil)

    Backoff multiplier

  • rate_limits (Array<RateLimit>) (defaults to: [])

    Rate limits

  • concurrency (Array<ConcurrencyExpression>, ConcurrencyExpression, nil) (defaults to: nil)
  • desired_worker_labels (Hash, nil) (defaults to: nil)
  • wait_for (Array) (defaults to: [])

    Wait conditions

  • skip_if (Array) (defaults to: [])

    Skip conditions

  • durable (Boolean) (defaults to: false)

    Whether this is a durable task

  • workflow (Workflow, nil) (defaults to: nil)

    The owning workflow

  • client (Hatchet::Client, nil) (defaults to: nil)

    The client

  • deps (Hash, nil) (defaults to: nil)

    Dependency providers

  • block (Proc)

    The task execution block



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/hatchet/task.rb', line 100

def initialize(
  name:,
  parents: [],
  execution_timeout: nil,
  schedule_timeout: nil,
  retries: nil,
  backoff_max_seconds: nil,
  backoff_factor: nil,
  rate_limits: [],
  concurrency: nil,
  desired_worker_labels: nil,
  wait_for: [],
  skip_if: [],
  durable: false,
  eviction_policy: nil,
  workflow: nil,
  client: nil,
  deps: nil,
  &block
)
  @name = name.to_sym
  @parents = parents
  @execution_timeout = execution_timeout
  @schedule_timeout = schedule_timeout
  @retries = retries
  @backoff_max_seconds = backoff_max_seconds
  @backoff_factor = backoff_factor
  @rate_limits = rate_limits
  @concurrency = concurrency
  @desired_worker_labels = desired_worker_labels
  @wait_for = wait_for
  @skip_if = skip_if
  @durable = durable
  @eviction_policy = eviction_policy
  @workflow = workflow
  @client = client
  @deps = deps
  # Convert Proc to lambda to avoid LocalJumpError on bare `return`
  @fn = block
end

Instance Attribute Details

#backoff_factorFloat? (readonly)

Returns Backoff factor between retries.

Returns:

  • (Float, nil)

    Backoff factor between retries



48
49
50
# File 'lib/hatchet/task.rb', line 48

def backoff_factor
  @backoff_factor
end

#backoff_max_secondsInteger? (readonly)

Returns Maximum backoff seconds between retries.

Returns:

  • (Integer, nil)

    Maximum backoff seconds between retries



45
46
47
# File 'lib/hatchet/task.rb', line 45

def backoff_max_seconds
  @backoff_max_seconds
end

#clientHatchet::Client? (readonly)

Returns The Hatchet client.

Returns:



78
79
80
# File 'lib/hatchet/task.rb', line 78

def client
  @client
end

#concurrencyArray<ConcurrencyExpression>, ... (readonly)

Returns Task-level concurrency.

Returns:



54
55
56
# File 'lib/hatchet/task.rb', line 54

def concurrency
  @concurrency
end

#depsHash? (readonly)

Returns Dependency providers.

Returns:

  • (Hash, nil)

    Dependency providers



81
82
83
# File 'lib/hatchet/task.rb', line 81

def deps
  @deps
end

#desired_worker_labelsHash? (readonly)

Returns Desired worker labels for scheduling.

Returns:

  • (Hash, nil)

    Desired worker labels for scheduling



57
58
59
# File 'lib/hatchet/task.rb', line 57

def desired_worker_labels
  @desired_worker_labels
end

#durableBoolean (readonly)

Returns Whether this is a durable task.

Returns:

  • (Boolean)

    Whether this is a durable task



66
67
68
# File 'lib/hatchet/task.rb', line 66

def durable
  @durable
end

#eviction_policyHatchet::EvictionPolicy? (readonly)

Returns Eviction policy for durable tasks.

Returns:



69
70
71
# File 'lib/hatchet/task.rb', line 69

def eviction_policy
  @eviction_policy
end

#execution_timeoutInteger? (readonly)

Returns Execution timeout in seconds.

Returns:

  • (Integer, nil)

    Execution timeout in seconds



36
37
38
# File 'lib/hatchet/task.rb', line 36

def execution_timeout
  @execution_timeout
end

#fnProc? (readonly)

Returns The task execution block.

Returns:

  • (Proc, nil)

    The task execution block



72
73
74
# File 'lib/hatchet/task.rb', line 72

def fn
  @fn
end

#nameSymbol, String (readonly)

Returns Task name.

Returns:

  • (Symbol, String)

    Task name



30
31
32
# File 'lib/hatchet/task.rb', line 30

def name
  @name
end

#parentsArray<Task, Symbol> (readonly)

Returns Parent task references.

Returns:

  • (Array<Task, Symbol>)

    Parent task references



33
34
35
# File 'lib/hatchet/task.rb', line 33

def parents
  @parents
end

#rate_limitsArray<RateLimit> (readonly)

Returns Rate limits applied to this task.

Returns:

  • (Array<RateLimit>)

    Rate limits applied to this task



51
52
53
# File 'lib/hatchet/task.rb', line 51

def rate_limits
  @rate_limits
end

#retriesInteger? (readonly)

Returns Maximum number of retries.

Returns:

  • (Integer, nil)

    Maximum number of retries



42
43
44
# File 'lib/hatchet/task.rb', line 42

def retries
  @retries
end

#schedule_timeoutInteger? (readonly)

Returns Schedule timeout in seconds.

Returns:

  • (Integer, nil)

    Schedule timeout in seconds



39
40
41
# File 'lib/hatchet/task.rb', line 39

def schedule_timeout
  @schedule_timeout
end

#skip_ifArray (readonly)

Returns Skip-if conditions.

Returns:

  • (Array)

    Skip-if conditions



63
64
65
# File 'lib/hatchet/task.rb', line 63

def skip_if
  @skip_if
end

#wait_forArray (readonly)

Returns Wait-for conditions.

Returns:

  • (Array)

    Wait-for conditions



60
61
62
# File 'lib/hatchet/task.rb', line 60

def wait_for
  @wait_for
end

#workflowWorkflow? (readonly)

Returns The owning workflow.

Returns:

  • (Workflow, nil)

    The owning workflow



75
76
77
# File 'lib/hatchet/task.rb', line 75

def workflow
  @workflow
end

Instance Method Details

#call(input, context) ⇒ Object

Execute the task with the given input and context

Parameters:

  • input (Hash)

    Task input

  • context (Context)

    Task context

Returns:

  • (Object)

    Task output

Raises:



146
147
148
149
150
# File 'lib/hatchet/task.rb', line 146

def call(input, context)
  raise Error, "No block defined for task #{@name}" unless @fn

  @fn.call(input, context)
end

#create_bulk_run_item(input: {}, key: nil, options: nil) ⇒ Hash

Create a bulk run item for use with run_many

Parameters:

  • input (Hash) (defaults to: {})

    Input data

  • key (String, nil) (defaults to: nil)

    Deduplication key

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:

  • (Hash)

    Bulk run item



260
261
262
263
264
265
# File 'lib/hatchet/task.rb', line 260

def create_bulk_run_item(input: {}, key: nil, options: nil)
  item = { input: input }
  item[:key] = key if key
  item[:options] = options if options
  item
end

#idString

Returns The workflow ID (for API calls).

Returns:

  • (String)

    The workflow ID (for API calls)



289
290
291
# File 'lib/hatchet/task.rb', line 289

def id
  @workflow&.id || @name.to_s
end

#mock_run(input:, additional_metadata: {}, retry_count: 0, parent_outputs: {}) ⇒ Object

Execute task in unit test mode with mocked context

Parameters:

  • input (Hash)

    Task input

  • additional_metadata (Hash) (defaults to: {})

    Metadata for the context

  • retry_count (Integer) (defaults to: 0)

    Simulated retry count

  • parent_outputs (Hash) (defaults to: {})

    Mocked parent task outputs

Returns:

  • (Object)

    Task output



274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/hatchet/task.rb', line 274

def mock_run(input:, additional_metadata: {}, retry_count: 0, parent_outputs: {})
  ctx = Context.new(
    workflow_run_id: "mock-run-id",
    step_run_id: "mock-step-run-id",
    action: nil,
    client: nil,
    additional_metadata: ,
    retry_count: retry_count,
    parent_outputs: parent_outputs,
  )

  call(input, ctx)
end

#run(input = {}, options: nil) ⇒ Hash

Run this task (or its owning workflow) synchronously.

For standalone tasks the result is automatically unwrapped so that the caller receives the task output directly (e.g. ‘=> “done”`) rather than the workflow-level output keyed by task name (e.g. `=> {“result” => “done”}`).

Parameters:

  • input (Hash) (defaults to: {})

    Input data

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:

  • (Hash)

    The task output

Raises:



207
208
209
210
211
212
213
# File 'lib/hatchet/task.rb', line 207

def run(input = {}, options: nil)
  target = @workflow || self
  raise Error, "No client associated with task #{@name}" unless effective_client

  result = effective_client.admin.trigger_workflow(target, input, options: options)
  extract_result(result)
end

#run_many(items, return_exceptions: false) ⇒ Array

Run many instances of this task in bulk

Parameters:

  • items (Array<Hash>)

    Bulk run items

  • return_exceptions (Boolean) (defaults to: false)

    Whether to return exceptions instead of raising

Returns:

  • (Array)

    Results (each unwrapped to the task output)

Raises:



236
237
238
239
240
241
# File 'lib/hatchet/task.rb', line 236

def run_many(items, return_exceptions: false)
  raise Error, "No client associated with task #{@name}" unless effective_client

  results = effective_client.admin.trigger_workflow_many(self, items, return_exceptions: return_exceptions)
  results.map { |r| r.is_a?(Exception) ? r : extract_result(r) }
end

#run_many_no_wait(items) ⇒ Array<TaskRunRef>

Run many instances without waiting for results

Parameters:

  • items (Array<Hash>)

    Bulk run items

Returns:

Raises:



247
248
249
250
251
252
# File 'lib/hatchet/task.rb', line 247

def run_many_no_wait(items)
  raise Error, "No client associated with task #{@name}" unless effective_client

  refs = effective_client.admin.trigger_workflow_many_no_wait(self, items)
  refs.map { |ref| TaskRunRef.new(workflow_run_ref: ref, task_name: @name) }
end

#run_no_wait(input = {}, options: nil) ⇒ TaskRunRef

Run this task without waiting for the result.

Returns a Hatchet::TaskRunRef whose result method automatically unwraps the task output, matching the behaviour of #run.

Parameters:

  • input (Hash) (defaults to: {})

    Input data

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:

Raises:



223
224
225
226
227
228
229
# File 'lib/hatchet/task.rb', line 223

def run_no_wait(input = {}, options: nil)
  target = @workflow || self
  raise Error, "No client associated with task #{@name}" unless effective_client

  ref = effective_client.admin.trigger_workflow_no_wait(target, input, options: options)
  TaskRunRef.new(workflow_run_ref: ref, task_name: @name)
end

#to_proto(service_name, config: nil) ⇒ V1::CreateTaskOpts

Convert this task to a V1::CreateTaskOpts protobuf message.

Parameters:

  • service_name (String)

    The workflow service name (namespaced)

  • config (Hatchet::Config, nil) (defaults to: nil)

    Config for namespace resolution in conditions

Returns:



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/hatchet/task.rb', line 157

def to_proto(service_name, config: nil)
  opts = {
    readable_id: @name.to_s,
    action: "#{service_name}:#{@name}",
    inputs: "{}",
    parents: parent_names,
    retries: @retries || 0,
  }

  # Timeout as duration string (e.g. "60s")
  opts[:timeout] = duration_to_expr(@execution_timeout) if @execution_timeout

  # Schedule timeout
  opts[:schedule_timeout] = duration_to_expr(@schedule_timeout) if @schedule_timeout

  # Rate limits
  opts[:rate_limits] = @rate_limits.map { |rl| rate_limit_to_proto(rl) } if @rate_limits && !@rate_limits.empty?

  # Worker labels
  opts[:worker_labels] = build_worker_labels_map(@desired_worker_labels) if @desired_worker_labels && !@desired_worker_labels.empty?

  # Backoff settings
  opts[:backoff_factor] = @backoff_factor if @backoff_factor
  opts[:backoff_max_seconds] = @backoff_max_seconds if @backoff_max_seconds

  # Task-level concurrency
  if @concurrency
    conc_list = @concurrency.is_a?(Array) ? @concurrency : [@concurrency]
    opts[:concurrency] = conc_list.map(&:to_proto)
  end

  # Conditions (wait_for, skip_if)
  conditions_proto = conditions_to_proto(config)
  opts[:conditions] = conditions_proto if conditions_proto

  opts[:is_durable] = @durable

  ::V1::CreateTaskOpts.new(**opts)
end