Class: Hatchet::Context
- Inherits:
-
Object
- Object
- Hatchet::Context
- Defined in:
- lib/hatchet/context.rb
Overview
Context object passed to task execution blocks.
Provides access to workflow run metadata, parent task outputs, logging, cancellation, and other runtime capabilities.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#additional_metadata ⇒ Hash
readonly
Additional metadata attached to this run.
-
#attempt_number ⇒ Integer
readonly
Current attempt number (retry_count + 1).
-
#deps ⇒ Hash
Resolved dependency values.
-
#filter_payload ⇒ Hash?
readonly
Filter payload for event-triggered workflows.
-
#priority ⇒ Integer?
readonly
Task priority.
-
#retry_count ⇒ Integer
readonly
Current retry count (0 on first attempt).
-
#step_run_id ⇒ String
readonly
The step run ID.
-
#worker_id ⇒ String?
readonly
The worker ID assigned by the server.
-
#workflow_run_id ⇒ String
readonly
The workflow run ID.
Instance Method Summary collapse
-
#cancel ⇒ Object
Cancel the current workflow run.
-
#cancelled? ⇒ Boolean
Check if the task has been cancelled.
-
#get_task_run_error(task_ref) ⇒ TaskRunError?
Get the error from a specific upstream task (used in on_failure tasks).
-
#initialize(workflow_run_id:, step_run_id:, action: nil, client: nil, dispatcher_client: nil, event_client: nil, additional_metadata: {}, retry_count: 0, parent_outputs: {}, deps: {}, priority: nil, filter_payload: nil, worker_context: nil, worker_id: nil) ⇒ Context
constructor
A new instance of Context.
-
#log(message) ⇒ Object
Log a message via the Hatchet logging system.
-
#put_stream(data) ⇒ Object
Put a stream chunk for real-time streaming output.
-
#refresh_timeout(duration) ⇒ Object
Refresh the execution timeout for this task.
-
#release_slot ⇒ Object
Release the worker slot before the task completes.
-
#task_output(task_ref) ⇒ Hash?
Get the output of a parent task.
-
#task_run_errors ⇒ Array<TaskRunError>
Get errors from upstream task runs (used in on_failure tasks).
-
#was_skipped?(task_ref) ⇒ Boolean
Check if a parent task was skipped.
-
#worker ⇒ WorkerContext?
Access the worker context for worker-level operations.
Constructor Details
#initialize(workflow_run_id:, step_run_id:, action: nil, client: nil, dispatcher_client: nil, event_client: nil, additional_metadata: {}, retry_count: 0, parent_outputs: {}, deps: {}, priority: nil, filter_payload: nil, worker_context: nil, worker_id: nil) ⇒ Context
Returns a new instance of Context.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/hatchet/context.rb', line 55 def initialize( workflow_run_id:, step_run_id:, action: nil, client: nil, dispatcher_client: nil, event_client: nil, additional_metadata: {}, retry_count: 0, parent_outputs: {}, deps: {}, priority: nil, filter_payload: nil, worker_context: nil, worker_id: nil ) @workflow_run_id = workflow_run_id @step_run_id = step_run_id @worker_id = worker_id @action = action @client = client @dispatcher_client = dispatcher_client @event_client = event_client @additional_metadata = || {} @retry_count = retry_count @attempt_number = retry_count + 1 @parent_outputs = parent_outputs || {} @deps = deps || {} @priority = priority @filter_payload = filter_payload @worker_context = worker_context @exit_flag = false @cancelled = false end |
Instance Attribute Details
#additional_metadata ⇒ Hash (readonly)
Returns Additional metadata attached to this run.
22 23 24 |
# File 'lib/hatchet/context.rb', line 22 def @additional_metadata end |
#attempt_number ⇒ Integer (readonly)
Returns Current attempt number (retry_count + 1).
28 29 30 |
# File 'lib/hatchet/context.rb', line 28 def attempt_number @attempt_number end |
#deps ⇒ Hash
Returns Resolved dependency values.
31 32 33 |
# File 'lib/hatchet/context.rb', line 31 def deps @deps end |
#filter_payload ⇒ Hash? (readonly)
Returns Filter payload for event-triggered workflows.
37 38 39 |
# File 'lib/hatchet/context.rb', line 37 def filter_payload @filter_payload end |
#priority ⇒ Integer? (readonly)
Returns Task priority.
34 35 36 |
# File 'lib/hatchet/context.rb', line 34 def priority @priority end |
#retry_count ⇒ Integer (readonly)
Returns Current retry count (0 on first attempt).
25 26 27 |
# File 'lib/hatchet/context.rb', line 25 def retry_count @retry_count end |
#step_run_id ⇒ String (readonly)
Returns The step run ID.
19 20 21 |
# File 'lib/hatchet/context.rb', line 19 def step_run_id @step_run_id end |
#worker_id ⇒ String? (readonly)
Returns The worker ID assigned by the server.
40 41 42 |
# File 'lib/hatchet/context.rb', line 40 def worker_id @worker_id end |
#workflow_run_id ⇒ String (readonly)
Returns The workflow run ID.
16 17 18 |
# File 'lib/hatchet/context.rb', line 16 def workflow_run_id @workflow_run_id end |
Instance Method Details
#cancel ⇒ Object
Cancel the current workflow run
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/hatchet/context.rb', line 132 def cancel @cancelled = true @exit_flag = true return unless @client && @workflow_run_id begin @client.runs.cancel(@workflow_run_id) rescue StandardError nil end end |
#cancelled? ⇒ Boolean
Check if the task has been cancelled
147 148 149 |
# File 'lib/hatchet/context.rb', line 147 def cancelled? @exit_flag end |
#get_task_run_error(task_ref) ⇒ TaskRunError?
Get the error from a specific upstream task (used in on_failure tasks)
191 192 193 194 195 196 197 198 199 |
# File 'lib/hatchet/context.rb', line 191 def get_task_run_error(task_ref) key = case task_ref when Symbol then task_ref.to_s when String then task_ref else task_ref.respond_to?(:name) ? task_ref.name.to_s : task_ref.to_s end task_run_errors.find { |e| e.respond_to?(:task_name) && e.task_name == key } end |
#log(message) ⇒ Object
Log a message via the Hatchet logging system. Sends the log to the server via gRPC if an event client is available.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/hatchet/context.rb', line 116 def log() msg = .is_a?(String) ? : .inspect # Send log to server via gRPC if @event_client && @step_run_id begin @event_client.put_log(step_run_id: @step_run_id, message: msg) rescue StandardError => e @client&.logger&.warn("Failed to send log to server: #{e.}") end end @client&.logger&.info(msg) || puts(msg) end |
#put_stream(data) ⇒ Object
Put a stream chunk for real-time streaming output.
174 175 176 177 178 |
# File 'lib/hatchet/context.rb', line 174 def put_stream(data) return unless @event_client && @step_run_id @event_client.put_stream(step_run_id: @step_run_id, data: data) end |
#refresh_timeout(duration) ⇒ Object
Refresh the execution timeout for this task.
154 155 156 157 158 159 160 161 |
# File 'lib/hatchet/context.rb', line 154 def refresh_timeout(duration) return unless @dispatcher_client && @step_run_id @dispatcher_client.refresh_timeout( step_run_id: @step_run_id, timeout_seconds: duration, ) end |
#release_slot ⇒ Object
Release the worker slot before the task completes. Useful for tasks that have a resource-intensive phase followed by a lighter phase.
165 166 167 168 169 |
# File 'lib/hatchet/context.rb', line 165 def release_slot return unless @dispatcher_client && @step_run_id @dispatcher_client.release_slot(step_run_id: @step_run_id) end |
#task_output(task_ref) ⇒ Hash?
Get the output of a parent task
94 95 96 97 98 99 100 101 102 |
# File 'lib/hatchet/context.rb', line 94 def task_output(task_ref) key = case task_ref when Symbol then task_ref.to_s when String then task_ref else task_ref.respond_to?(:name) ? task_ref.name.to_s : task_ref.to_s end @parent_outputs[key] || @parent_outputs[key.to_sym] end |
#task_run_errors ⇒ Array<TaskRunError>
Get errors from upstream task runs (used in on_failure tasks)
183 184 185 |
# File 'lib/hatchet/context.rb', line 183 def task_run_errors @action.respond_to?(:task_run_errors) ? @action.task_run_errors : [] end |
#was_skipped?(task_ref) ⇒ Boolean
Check if a parent task was skipped
108 109 110 |
# File 'lib/hatchet/context.rb', line 108 def was_skipped?(task_ref) task_output(task_ref).nil? end |
#worker ⇒ WorkerContext?
Access the worker context for worker-level operations
204 205 206 |
# File 'lib/hatchet/context.rb', line 204 def worker @worker_context end |