Class: Hatchet::Task
- Inherits:
-
Object
- Object
- Hatchet::Task
- Defined in:
- lib/hatchet/task.rb
Overview
Represents a task within a workflow (or a standalone task).
Tasks are the basic unit of work in Hatchet. They can be defined as part of a workflow or as standalone tasks. Each task has a block that executes the task logic, receiving the workflow input and a context object.
Constant Summary collapse
- DURATION_MAP =
{ second: :SECOND, minute: :MINUTE, hour: :HOUR, day: :DAY, week: :WEEK, month: :MONTH, year: :YEAR, }.freeze
- COMPARATOR_MAP =
{ equal: :EQUAL, not_equal: :NOT_EQUAL, greater_than: :GREATER_THAN, greater_than_or_equal: :GREATER_THAN_OR_EQUAL, less_than: :LESS_THAN, less_than_or_equal: :LESS_THAN_OR_EQUAL, }.freeze
Instance Attribute Summary collapse
-
#backoff_factor ⇒ Float?
readonly
Backoff factor between retries.
-
#backoff_max_seconds ⇒ Integer?
readonly
Maximum backoff seconds between retries.
-
#client ⇒ Hatchet::Client?
readonly
The Hatchet client.
-
#concurrency ⇒ Array<ConcurrencyExpression>, ...
readonly
Task-level concurrency.
-
#deps ⇒ Hash?
readonly
Dependency providers.
-
#desired_worker_labels ⇒ Hash?
readonly
Desired worker labels for scheduling.
-
#durable ⇒ Boolean
readonly
Whether this is a durable task.
-
#eviction_policy ⇒ Hatchet::EvictionPolicy?
readonly
Eviction policy for durable tasks.
-
#execution_timeout ⇒ Integer?
readonly
Execution timeout in seconds.
-
#fn ⇒ Proc?
readonly
The task execution block.
-
#name ⇒ Symbol, String
readonly
Task name.
-
#parents ⇒ Array<Task, Symbol>
readonly
Parent task references.
-
#rate_limits ⇒ Array<RateLimit>
readonly
Rate limits applied to this task.
-
#retries ⇒ Integer?
readonly
Maximum number of retries.
-
#schedule_timeout ⇒ Integer?
readonly
Schedule timeout in seconds.
-
#skip_if ⇒ Array
readonly
Skip-if conditions.
-
#wait_for ⇒ Array
readonly
Wait-for conditions.
-
#workflow ⇒ Workflow?
readonly
The owning workflow.
Instance Method Summary collapse
-
#call(input, context) ⇒ Object
Execute the task with the given input and context.
-
#create_bulk_run_item(input: {}, key: nil, options: nil) ⇒ Hash
Create a bulk run item for use with run_many.
-
#id ⇒ String
The workflow ID (for API calls).
-
#initialize(name:, parents: [], execution_timeout: nil, schedule_timeout: nil, retries: nil, backoff_max_seconds: nil, backoff_factor: nil, rate_limits: [], concurrency: nil, desired_worker_labels: nil, wait_for: [], skip_if: [], durable: false, eviction_policy: nil, workflow: nil, client: nil, deps: nil, &block) ⇒ Task
constructor
A new instance of Task.
-
#mock_run(input:, additional_metadata: {}, retry_count: 0, parent_outputs: {}) ⇒ Object
Execute task in unit test mode with mocked context.
-
#run(input = {}, options: nil) ⇒ Hash
Run this task (or its owning workflow) synchronously.
-
#run_many(items, return_exceptions: false) ⇒ Array
Run many instances of this task in bulk.
-
#run_many_no_wait(items) ⇒ Array<TaskRunRef>
Run many instances without waiting for results.
-
#run_no_wait(input = {}, options: nil) ⇒ TaskRunRef
Run this task without waiting for the result.
-
#to_proto(service_name, config: nil) ⇒ V1::CreateTaskOpts
Convert this task to a V1::CreateTaskOpts protobuf message.
Constructor Details
#initialize(name:, parents: [], execution_timeout: nil, schedule_timeout: nil, retries: nil, backoff_max_seconds: nil, backoff_factor: nil, rate_limits: [], concurrency: nil, desired_worker_labels: nil, wait_for: [], skip_if: [], durable: false, eviction_policy: nil, workflow: nil, client: nil, deps: nil, &block) ⇒ Task
Returns a new instance of Task.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/hatchet/task.rb', line 100 def initialize( name:, parents: [], execution_timeout: nil, schedule_timeout: nil, retries: nil, backoff_max_seconds: nil, backoff_factor: nil, rate_limits: [], concurrency: nil, desired_worker_labels: nil, wait_for: [], skip_if: [], durable: false, eviction_policy: nil, workflow: nil, client: nil, deps: nil, &block ) @name = name.to_sym @parents = parents @execution_timeout = execution_timeout @schedule_timeout = schedule_timeout @retries = retries @backoff_max_seconds = backoff_max_seconds @backoff_factor = backoff_factor @rate_limits = rate_limits @concurrency = concurrency @desired_worker_labels = desired_worker_labels @wait_for = wait_for @skip_if = skip_if @durable = durable @eviction_policy = eviction_policy @workflow = workflow @client = client @deps = deps # Convert Proc to lambda to avoid LocalJumpError on bare `return` @fn = block end |
Instance Attribute Details
#backoff_factor ⇒ Float? (readonly)
Returns Backoff factor between retries.
48 49 50 |
# File 'lib/hatchet/task.rb', line 48 def backoff_factor @backoff_factor end |
#backoff_max_seconds ⇒ Integer? (readonly)
Returns Maximum backoff seconds between retries.
45 46 47 |
# File 'lib/hatchet/task.rb', line 45 def backoff_max_seconds @backoff_max_seconds end |
#client ⇒ Hatchet::Client? (readonly)
Returns The Hatchet client.
78 79 80 |
# File 'lib/hatchet/task.rb', line 78 def client @client end |
#concurrency ⇒ Array<ConcurrencyExpression>, ... (readonly)
Returns Task-level concurrency.
54 55 56 |
# File 'lib/hatchet/task.rb', line 54 def concurrency @concurrency end |
#deps ⇒ Hash? (readonly)
Returns Dependency providers.
81 82 83 |
# File 'lib/hatchet/task.rb', line 81 def deps @deps end |
#desired_worker_labels ⇒ Hash? (readonly)
Returns Desired worker labels for scheduling.
57 58 59 |
# File 'lib/hatchet/task.rb', line 57 def desired_worker_labels @desired_worker_labels end |
#durable ⇒ Boolean (readonly)
Returns Whether this is a durable task.
66 67 68 |
# File 'lib/hatchet/task.rb', line 66 def durable @durable end |
#eviction_policy ⇒ Hatchet::EvictionPolicy? (readonly)
Returns Eviction policy for durable tasks.
69 70 71 |
# File 'lib/hatchet/task.rb', line 69 def eviction_policy @eviction_policy end |
#execution_timeout ⇒ Integer? (readonly)
Returns Execution timeout in seconds.
36 37 38 |
# File 'lib/hatchet/task.rb', line 36 def execution_timeout @execution_timeout end |
#fn ⇒ Proc? (readonly)
Returns The task execution block.
72 73 74 |
# File 'lib/hatchet/task.rb', line 72 def fn @fn end |
#name ⇒ Symbol, String (readonly)
Returns Task name.
30 31 32 |
# File 'lib/hatchet/task.rb', line 30 def name @name end |
#parents ⇒ Array<Task, Symbol> (readonly)
Returns Parent task references.
33 34 35 |
# File 'lib/hatchet/task.rb', line 33 def parents @parents end |
#rate_limits ⇒ Array<RateLimit> (readonly)
Returns Rate limits applied to this task.
51 52 53 |
# File 'lib/hatchet/task.rb', line 51 def rate_limits @rate_limits end |
#retries ⇒ Integer? (readonly)
Returns Maximum number of retries.
42 43 44 |
# File 'lib/hatchet/task.rb', line 42 def retries @retries end |
#schedule_timeout ⇒ Integer? (readonly)
Returns Schedule timeout in seconds.
39 40 41 |
# File 'lib/hatchet/task.rb', line 39 def schedule_timeout @schedule_timeout end |
#skip_if ⇒ Array (readonly)
Returns Skip-if conditions.
63 64 65 |
# File 'lib/hatchet/task.rb', line 63 def skip_if @skip_if end |
#wait_for ⇒ Array (readonly)
Returns Wait-for conditions.
60 61 62 |
# File 'lib/hatchet/task.rb', line 60 def wait_for @wait_for end |
#workflow ⇒ Workflow? (readonly)
Returns The owning workflow.
75 76 77 |
# File 'lib/hatchet/task.rb', line 75 def workflow @workflow end |
Instance Method Details
#call(input, context) ⇒ Object
Execute the task with the given input and context
146 147 148 149 150 |
# File 'lib/hatchet/task.rb', line 146 def call(input, context) raise Error, "No block defined for task #{@name}" unless @fn @fn.call(input, context) end |
#create_bulk_run_item(input: {}, key: nil, options: nil) ⇒ Hash
Create a bulk run item for use with run_many
260 261 262 263 264 265 |
# File 'lib/hatchet/task.rb', line 260 def create_bulk_run_item(input: {}, key: nil, options: nil) item = { input: input } item[:key] = key if key item[:options] = if item end |
#id ⇒ String
Returns The workflow ID (for API calls).
289 290 291 |
# File 'lib/hatchet/task.rb', line 289 def id @workflow&.id || @name.to_s end |
#mock_run(input:, additional_metadata: {}, retry_count: 0, parent_outputs: {}) ⇒ Object
Execute task in unit test mode with mocked context
274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/hatchet/task.rb', line 274 def mock_run(input:, additional_metadata: {}, retry_count: 0, parent_outputs: {}) ctx = Context.new( workflow_run_id: "mock-run-id", step_run_id: "mock-step-run-id", action: nil, client: nil, additional_metadata: , retry_count: retry_count, parent_outputs: parent_outputs, ) call(input, ctx) end |
#run(input = {}, options: nil) ⇒ Hash
Run this task (or its owning workflow) synchronously.
For standalone tasks the result is automatically unwrapped so that the caller receives the task output directly (e.g. ‘=> “done”`) rather than the workflow-level output keyed by task name (e.g. `=> {“result” => “done”}`).
207 208 209 210 211 212 213 |
# File 'lib/hatchet/task.rb', line 207 def run(input = {}, options: nil) target = @workflow || self raise Error, "No client associated with task #{@name}" unless effective_client result = effective_client.admin.trigger_workflow(target, input, options: ) extract_result(result) end |
#run_many(items, return_exceptions: false) ⇒ Array
Run many instances of this task in bulk
236 237 238 239 240 241 |
# File 'lib/hatchet/task.rb', line 236 def run_many(items, return_exceptions: false) raise Error, "No client associated with task #{@name}" unless effective_client results = effective_client.admin.trigger_workflow_many(self, items, return_exceptions: return_exceptions) results.map { |r| r.is_a?(Exception) ? r : extract_result(r) } end |
#run_many_no_wait(items) ⇒ Array<TaskRunRef>
Run many instances without waiting for results
247 248 249 250 251 252 |
# File 'lib/hatchet/task.rb', line 247 def run_many_no_wait(items) raise Error, "No client associated with task #{@name}" unless effective_client refs = effective_client.admin.trigger_workflow_many_no_wait(self, items) refs.map { |ref| TaskRunRef.new(workflow_run_ref: ref, task_name: @name) } end |
#run_no_wait(input = {}, options: nil) ⇒ TaskRunRef
Run this task without waiting for the result.
Returns a Hatchet::TaskRunRef whose result method automatically unwraps the task output, matching the behaviour of #run.
223 224 225 226 227 228 229 |
# File 'lib/hatchet/task.rb', line 223 def run_no_wait(input = {}, options: nil) target = @workflow || self raise Error, "No client associated with task #{@name}" unless effective_client ref = effective_client.admin.trigger_workflow_no_wait(target, input, options: ) TaskRunRef.new(workflow_run_ref: ref, task_name: @name) end |
#to_proto(service_name, config: nil) ⇒ V1::CreateTaskOpts
Convert this task to a V1::CreateTaskOpts protobuf message.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/hatchet/task.rb', line 157 def to_proto(service_name, config: nil) opts = { readable_id: @name.to_s, action: "#{service_name}:#{@name}", inputs: "{}", parents: parent_names, retries: @retries || 0, } # Timeout as duration string (e.g. "60s") opts[:timeout] = duration_to_expr(@execution_timeout) if @execution_timeout # Schedule timeout opts[:schedule_timeout] = duration_to_expr(@schedule_timeout) if @schedule_timeout # Rate limits opts[:rate_limits] = @rate_limits.map { |rl| rate_limit_to_proto(rl) } if @rate_limits && !@rate_limits.empty? # Worker labels opts[:worker_labels] = build_worker_labels_map(@desired_worker_labels) if @desired_worker_labels && !@desired_worker_labels.empty? # Backoff settings opts[:backoff_factor] = @backoff_factor if @backoff_factor opts[:backoff_max_seconds] = @backoff_max_seconds if @backoff_max_seconds # Task-level concurrency if @concurrency conc_list = @concurrency.is_a?(Array) ? @concurrency : [@concurrency] opts[:concurrency] = conc_list.map(&:to_proto) end # Conditions (wait_for, skip_if) conditions_proto = conditions_to_proto(config) opts[:conditions] = conditions_proto if conditions_proto opts[:is_durable] = @durable ::V1::CreateTaskOpts.new(**opts) end |