Class: Hatchet::Workflow
- Inherits:
-
Object
- Object
- Hatchet::Workflow
- Defined in:
- lib/hatchet/workflow.rb
Overview
Represents a workflow definition with one or more tasks arranged in a DAG.
Instance Attribute Summary collapse
-
#client ⇒ Hatchet::Client?
readonly
The Hatchet client.
-
#concurrency ⇒ Array<ConcurrencyExpression>, ...
readonly
Workflow-level concurrency.
-
#default_filters ⇒ Array<DefaultFilter>
readonly
Default filters for event triggers.
-
#default_priority ⇒ Integer?
readonly
Default priority for runs (1-4).
-
#id ⇒ String?
Get the workflow ID (UUID).
-
#name ⇒ String
readonly
Workflow name.
-
#on_crons ⇒ Array<String>
readonly
Cron expressions that trigger this workflow.
-
#on_events ⇒ Array<String>
readonly
Event keys that trigger this workflow.
-
#on_failure ⇒ Task?
readonly
The on_failure task.
-
#on_success ⇒ Task?
readonly
The on_success task.
-
#sticky ⇒ Symbol?
readonly
Sticky strategy (:soft, :hard).
-
#task_defaults ⇒ Hash?
readonly
Default task settings.
-
#tasks ⇒ Hash<Symbol, Task>
readonly
Map of task name to Task object.
Instance Method Summary collapse
-
#create_bulk_run_item(input: {}, key: nil, options: nil) ⇒ Hash
Create a bulk run item for use with run_many.
-
#create_cron(cron_name, expression, input: {}) ⇒ Object
Create a cron trigger for this workflow.
-
#durable_task(name, eviction_policy: Hatchet::DEFAULT_DURABLE_TASK_EVICTION_POLICY, **opts) {|input, ctx| ... } ⇒ Task
Define a durable task within this workflow.
-
#initialize(name:, on_events: [], on_crons: [], concurrency: nil, default_priority: nil, task_defaults: nil, default_filters: [], sticky: nil, client: nil) ⇒ Workflow
constructor
A new instance of Workflow.
-
#on_failure_task(**opts) {|input, ctx| ... } ⇒ Task
Define an on_failure task for this workflow.
-
#on_success_task(**opts) {|input, ctx| ... } ⇒ Task
Define an on_success task for this workflow.
-
#run(input = {}, options: nil) ⇒ Hash
Run this workflow synchronously.
-
#run_many(items, return_exceptions: false) ⇒ Array
Run many instances of this workflow in bulk.
-
#run_many_no_wait(items) ⇒ Array<WorkflowRunRef>
Run many instances without waiting for results.
-
#run_no_wait(input = {}, options: nil) ⇒ WorkflowRunRef
Run this workflow without waiting for the result.
-
#schedule(time, input: {}, options: nil) ⇒ Object
Schedule this workflow for future execution.
-
#task(name, **opts) {|input, ctx| ... } ⇒ Task
Define a task within this workflow.
-
#to_proto(config) ⇒ V1::CreateWorkflowVersionRequest
Convert this workflow to a V1::CreateWorkflowVersionRequest protobuf message.
Constructor Details
#initialize(name:, on_events: [], on_crons: [], concurrency: nil, default_priority: nil, task_defaults: nil, default_filters: [], sticky: nil, client: nil) ⇒ Workflow
Returns a new instance of Workflow.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/hatchet/workflow.rb', line 61 def initialize( name:, on_events: [], on_crons: [], concurrency: nil, default_priority: nil, task_defaults: nil, default_filters: [], sticky: nil, client: nil ) @name = name @tasks = {} @on_events = on_events @on_crons = on_crons @concurrency = concurrency @default_priority = default_priority @task_defaults = task_defaults @default_filters = default_filters @sticky = sticky @client = client @on_failure = nil @on_success = nil @id = nil end |
Instance Attribute Details
#client ⇒ Hatchet::Client? (readonly)
Returns The Hatchet client.
41 42 43 |
# File 'lib/hatchet/workflow.rb', line 41 def client @client end |
#concurrency ⇒ Array<ConcurrencyExpression>, ... (readonly)
Returns Workflow-level concurrency.
26 27 28 |
# File 'lib/hatchet/workflow.rb', line 26 def concurrency @concurrency end |
#default_filters ⇒ Array<DefaultFilter> (readonly)
Returns Default filters for event triggers.
35 36 37 |
# File 'lib/hatchet/workflow.rb', line 35 def default_filters @default_filters end |
#default_priority ⇒ Integer? (readonly)
Returns Default priority for runs (1-4).
29 30 31 |
# File 'lib/hatchet/workflow.rb', line 29 def default_priority @default_priority end |
#id ⇒ String?
Get the workflow ID (UUID). If not already set, lazily resolves it by looking up the workflow by name via the REST API.
91 92 93 |
# File 'lib/hatchet/workflow.rb', line 91 def id @id ||= resolve_workflow_id end |
#name ⇒ String (readonly)
Returns Workflow name.
14 15 16 |
# File 'lib/hatchet/workflow.rb', line 14 def name @name end |
#on_crons ⇒ Array<String> (readonly)
Returns Cron expressions that trigger this workflow.
23 24 25 |
# File 'lib/hatchet/workflow.rb', line 23 def on_crons @on_crons end |
#on_events ⇒ Array<String> (readonly)
Returns Event keys that trigger this workflow.
20 21 22 |
# File 'lib/hatchet/workflow.rb', line 20 def on_events @on_events end |
#on_failure ⇒ Task? (readonly)
Returns The on_failure task.
44 45 46 |
# File 'lib/hatchet/workflow.rb', line 44 def on_failure @on_failure end |
#on_success ⇒ Task? (readonly)
Returns The on_success task.
47 48 49 |
# File 'lib/hatchet/workflow.rb', line 47 def on_success @on_success end |
#sticky ⇒ Symbol? (readonly)
Returns Sticky strategy (:soft, :hard).
38 39 40 |
# File 'lib/hatchet/workflow.rb', line 38 def sticky @sticky end |
#task_defaults ⇒ Hash? (readonly)
Returns Default task settings.
32 33 34 |
# File 'lib/hatchet/workflow.rb', line 32 def task_defaults @task_defaults end |
#tasks ⇒ Hash<Symbol, Task> (readonly)
Returns Map of task name to Task object.
17 18 19 |
# File 'lib/hatchet/workflow.rb', line 17 def tasks @tasks end |
Instance Method Details
#create_bulk_run_item(input: {}, key: nil, options: nil) ⇒ Hash
Create a bulk run item for use with run_many
263 264 265 266 267 268 |
# File 'lib/hatchet/workflow.rb', line 263 def create_bulk_run_item(input: {}, key: nil, options: nil) item = { input: input } item[:key] = key if key item[:options] = if item end |
#create_cron(cron_name, expression, input: {}) ⇒ Object
Create a cron trigger for this workflow
288 289 290 291 292 293 294 295 296 297 |
# File 'lib/hatchet/workflow.rb', line 288 def create_cron(cron_name, expression, input: {}) raise Error, "No client associated with workflow #{@name}" unless @client @client.cron.create( workflow_name: @name, cron_name: cron_name, expression: expression, input: input, ) end |
#durable_task(name, eviction_policy: Hatchet::DEFAULT_DURABLE_TASK_EVICTION_POLICY, **opts) {|input, ctx| ... } ⇒ Task
Define a durable task within this workflow.
123 124 125 |
# File 'lib/hatchet/workflow.rb', line 123 def durable_task(name, eviction_policy: Hatchet::DEFAULT_DURABLE_TASK_EVICTION_POLICY, **opts, &) task(name, durable: true, eviction_policy: eviction_policy, **opts, &) end |
#on_failure_task(**opts) {|input, ctx| ... } ⇒ Task
Define an on_failure task for this workflow
132 133 134 135 136 137 138 139 140 |
# File 'lib/hatchet/workflow.rb', line 132 def on_failure_task(**opts, &) @on_failure = Task.new( name: :on_failure, workflow: self, client: @client, **opts, & ) end |
#on_success_task(**opts) {|input, ctx| ... } ⇒ Task
Define an on_success task for this workflow
147 148 149 150 151 152 153 154 155 |
# File 'lib/hatchet/workflow.rb', line 147 def on_success_task(**opts, &) @on_success = Task.new( name: :on_success, workflow: self, client: @client, **opts, & ) end |
#run(input = {}, options: nil) ⇒ Hash
Run this workflow synchronously
219 220 221 222 223 |
# File 'lib/hatchet/workflow.rb', line 219 def run(input = {}, options: nil) raise Error, "No client associated with workflow #{@name}" unless @client @client.admin.trigger_workflow(self, input, options: ) end |
#run_many(items, return_exceptions: false) ⇒ Array
Run many instances of this workflow in bulk
241 242 243 244 245 |
# File 'lib/hatchet/workflow.rb', line 241 def run_many(items, return_exceptions: false) raise Error, "No client associated with workflow #{@name}" unless @client @client.admin.trigger_workflow_many(self, items, return_exceptions: return_exceptions) end |
#run_many_no_wait(items) ⇒ Array<WorkflowRunRef>
Run many instances without waiting for results
251 252 253 254 255 |
# File 'lib/hatchet/workflow.rb', line 251 def run_many_no_wait(items) raise Error, "No client associated with workflow #{@name}" unless @client @client.admin.trigger_workflow_many_no_wait(self, items) end |
#run_no_wait(input = {}, options: nil) ⇒ WorkflowRunRef
Run this workflow without waiting for the result
230 231 232 233 234 |
# File 'lib/hatchet/workflow.rb', line 230 def run_no_wait(input = {}, options: nil) raise Error, "No client associated with workflow #{@name}" unless @client @client.admin.trigger_workflow_no_wait(self, input, options: ) end |
#schedule(time, input: {}, options: nil) ⇒ Object
Schedule this workflow for future execution
276 277 278 279 280 |
# File 'lib/hatchet/workflow.rb', line 276 def schedule(time, input: {}, options: nil) raise Error, "No client associated with workflow #{@name}" unless @client @client.admin.schedule_workflow(self, time, input: input, options: ) end |
#task(name, **opts) {|input, ctx| ... } ⇒ Task
Define a task within this workflow
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/hatchet/workflow.rb', line 101 def task(name, **opts, &) t = Task.new( name: name, workflow: self, client: @client, **opts, & ) @tasks[t.name] = t t end |
#to_proto(config) ⇒ V1::CreateWorkflowVersionRequest
Convert this workflow to a V1::CreateWorkflowVersionRequest protobuf message.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/hatchet/workflow.rb', line 161 def to_proto(config) service_name = config.apply_namespace(@name.downcase) # Namespace event triggers event_triggers = @on_events.map { |e| config.apply_namespace(e) } # Convert tasks to proto task_protos = @tasks.values.map { |t| t.to_proto(service_name, config: config) } # On-failure task on_failure_proto = @on_failure&.to_proto(service_name, config: config) # Build concurrency concurrency_proto = nil concurrency_arr = [] if @concurrency conc_list = @concurrency.is_a?(Array) ? @concurrency : [@concurrency] if conc_list.length == 1 concurrency_proto = conc_list.first.to_proto else concurrency_arr = conc_list.map(&:to_proto) end end # Sticky strategy sticky_proto = nil if @sticky sticky_map = { soft: :SOFT, hard: :HARD } sticky_proto = sticky_map[@sticky] end # Default filters filter_protos = (@default_filters || []).map(&:to_proto) args = { name: config.apply_namespace(@name), event_triggers: event_triggers, cron_triggers: @on_crons || [], tasks: task_protos, } args[:concurrency] = concurrency_proto if concurrency_proto args[:concurrency_arr] = concurrency_arr unless concurrency_arr.empty? args[:on_failure_task] = on_failure_proto if on_failure_proto args[:sticky] = sticky_proto if sticky_proto args[:default_priority] = @default_priority if @default_priority args[:default_filters] = filter_protos unless filter_protos.empty? ::V1::CreateWorkflowVersionRequest.new(**args) end |