Class: Hatchet::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/context.rb

Overview

Context object passed to task execution blocks.

Provides access to workflow run metadata, parent task outputs, logging, cancellation, and other runtime capabilities.

Examples:

Accessing parent output

workflow.task(:step2, parents: [step1]) do |input, ctx|
  parent_result = ctx.task_output(step1)
  { "sum" => parent_result["value"] + 1 }
end

Direct Known Subclasses

DurableContext

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow_run_id:, step_run_id:, action: nil, client: nil, dispatcher_client: nil, event_client: nil, additional_metadata: {}, retry_count: 0, parent_outputs: {}, deps: {}, priority: nil, filter_payload: nil, worker_context: nil, worker_id: nil) ⇒ Context

Returns a new instance of Context.

Parameters:

  • workflow_run_id (String)

    The workflow run ID

  • step_run_id (String)

    The step run ID

  • action (Object, nil) (defaults to: nil)

    The action object from the dispatcher

  • client (Hatchet::Client, nil) (defaults to: nil)

    The Hatchet client

  • dispatcher_client (Hatchet::Clients::Grpc::Dispatcher, nil) (defaults to: nil)

    gRPC dispatcher client

  • event_client (Hatchet::Clients::Grpc::EventClient, nil) (defaults to: nil)

    gRPC event client

  • additional_metadata (Hash) (defaults to: {})

    Additional metadata

  • retry_count (Integer) (defaults to: 0)

    Current retry count

  • parent_outputs (Hash) (defaults to: {})

    Hash of parent task name -> output

  • deps (Hash) (defaults to: {})

    Resolved dependency values

  • priority (Integer, nil) (defaults to: nil)

    Priority

  • filter_payload (Hash, nil) (defaults to: nil)

    Filter payload

  • worker_context (Object, nil) (defaults to: nil)

    Worker context for worker-level operations



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/hatchet/context.rb', line 55

def initialize(
  workflow_run_id:,
  step_run_id:,
  action: nil,
  client: nil,
  dispatcher_client: nil,
  event_client: nil,
  additional_metadata: {},
  retry_count: 0,
  parent_outputs: {},
  deps: {},
  priority: nil,
  filter_payload: nil,
  worker_context: nil,
  worker_id: nil
)
  @workflow_run_id = workflow_run_id
  @step_run_id = step_run_id
  @worker_id = worker_id
  @action = action
  @client = client
  @dispatcher_client = dispatcher_client
  @event_client = event_client
  @additional_metadata =  || {}
  @retry_count = retry_count
  @attempt_number = retry_count + 1
  @parent_outputs = parent_outputs || {}
  @deps = deps || {}
  @priority = priority
  @filter_payload = filter_payload
  @worker_context = worker_context
  @exit_flag = false
  @cancelled = false
end

Instance Attribute Details

#additional_metadataHash (readonly)

Returns Additional metadata attached to this run.

Returns:

  • (Hash)

    Additional metadata attached to this run



22
23
24
# File 'lib/hatchet/context.rb', line 22

def 
  @additional_metadata
end

#attempt_numberInteger (readonly)

Returns Current attempt number (retry_count + 1).

Returns:

  • (Integer)

    Current attempt number (retry_count + 1)



28
29
30
# File 'lib/hatchet/context.rb', line 28

def attempt_number
  @attempt_number
end

#depsHash

Returns Resolved dependency values.

Returns:

  • (Hash)

    Resolved dependency values



31
32
33
# File 'lib/hatchet/context.rb', line 31

def deps
  @deps
end

#filter_payloadHash? (readonly)

Returns Filter payload for event-triggered workflows.

Returns:

  • (Hash, nil)

    Filter payload for event-triggered workflows



37
38
39
# File 'lib/hatchet/context.rb', line 37

def filter_payload
  @filter_payload
end

#priorityInteger? (readonly)

Returns Task priority.

Returns:

  • (Integer, nil)

    Task priority



34
35
36
# File 'lib/hatchet/context.rb', line 34

def priority
  @priority
end

#retry_countInteger (readonly)

Returns Current retry count (0 on first attempt).

Returns:

  • (Integer)

    Current retry count (0 on first attempt)



25
26
27
# File 'lib/hatchet/context.rb', line 25

def retry_count
  @retry_count
end

#step_run_idString (readonly)

Returns The step run ID.

Returns:

  • (String)

    The step run ID



19
20
21
# File 'lib/hatchet/context.rb', line 19

def step_run_id
  @step_run_id
end

#worker_idString? (readonly)

Returns The worker ID assigned by the server.

Returns:

  • (String, nil)

    The worker ID assigned by the server



40
41
42
# File 'lib/hatchet/context.rb', line 40

def worker_id
  @worker_id
end

#workflow_run_idString (readonly)

Returns The workflow run ID.

Returns:

  • (String)

    The workflow run ID



16
17
18
# File 'lib/hatchet/context.rb', line 16

def workflow_run_id
  @workflow_run_id
end

Instance Method Details

#cancelObject

Cancel the current workflow run



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/hatchet/context.rb', line 132

def cancel
  @cancelled = true
  @exit_flag = true
  return unless @client && @workflow_run_id

  begin
    @client.runs.cancel(@workflow_run_id)
  rescue StandardError
    nil
  end
end

#cancelled?Boolean

Check if the task has been cancelled

Returns:

  • (Boolean)

    true if cancellation has been requested



147
148
149
# File 'lib/hatchet/context.rb', line 147

def cancelled?
  @exit_flag
end

#get_task_run_error(task_ref) ⇒ TaskRunError?

Get the error from a specific upstream task (used in on_failure tasks)

Parameters:

  • task_ref (Task, Symbol, String)

    Reference to the failed task

Returns:



191
192
193
194
195
196
197
198
199
# File 'lib/hatchet/context.rb', line 191

def get_task_run_error(task_ref)
  key = case task_ref
        when Symbol then task_ref.to_s
        when String then task_ref
        else task_ref.respond_to?(:name) ? task_ref.name.to_s : task_ref.to_s
        end

  task_run_errors.find { |e| e.respond_to?(:task_name) && e.task_name == key }
end

#log(message) ⇒ Object

Log a message via the Hatchet logging system. Sends the log to the server via gRPC if an event client is available.

Parameters:

  • message (String, Hash)

    The message to log



116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/hatchet/context.rb', line 116

def log(message)
  msg = message.is_a?(String) ? message : message.inspect

  # Send log to server via gRPC
  if @event_client && @step_run_id
    begin
      @event_client.put_log(step_run_id: @step_run_id, message: msg)
    rescue StandardError => e
      @client&.logger&.warn("Failed to send log to server: #{e.message}")
    end
  end

  @client&.logger&.info(msg) || puts(msg)
end

#put_stream(data) ⇒ Object

Put a stream chunk for real-time streaming output.

Parameters:

  • data (String)

    The chunk data to stream



174
175
176
177
178
# File 'lib/hatchet/context.rb', line 174

def put_stream(data)
  return unless @event_client && @step_run_id

  @event_client.put_stream(step_run_id: @step_run_id, data: data)
end

#refresh_timeout(duration) ⇒ Object

Refresh the execution timeout for this task.

Parameters:

  • duration (Integer, String)

    New timeout in seconds, or a duration string



154
155
156
157
158
159
160
161
# File 'lib/hatchet/context.rb', line 154

def refresh_timeout(duration)
  return unless @dispatcher_client && @step_run_id

  @dispatcher_client.refresh_timeout(
    step_run_id: @step_run_id,
    timeout_seconds: duration,
  )
end

#release_slotObject

Release the worker slot before the task completes. Useful for tasks that have a resource-intensive phase followed by a lighter phase.



165
166
167
168
169
# File 'lib/hatchet/context.rb', line 165

def release_slot
  return unless @dispatcher_client && @step_run_id

  @dispatcher_client.release_slot(step_run_id: @step_run_id)
end

#task_output(task_ref) ⇒ Hash?

Get the output of a parent task

Parameters:

  • task_ref (Task, Symbol, String)

    Reference to the parent task

Returns:

  • (Hash, nil)

    The parent task’s output



94
95
96
97
98
99
100
101
102
# File 'lib/hatchet/context.rb', line 94

def task_output(task_ref)
  key = case task_ref
        when Symbol then task_ref.to_s
        when String then task_ref
        else task_ref.respond_to?(:name) ? task_ref.name.to_s : task_ref.to_s
        end

  @parent_outputs[key] || @parent_outputs[key.to_sym]
end

#task_run_errorsArray<TaskRunError>

Get errors from upstream task runs (used in on_failure tasks)

Returns:



183
184
185
# File 'lib/hatchet/context.rb', line 183

def task_run_errors
  @action.respond_to?(:task_run_errors) ? @action.task_run_errors : []
end

#was_skipped?(task_ref) ⇒ Boolean

Check if a parent task was skipped

Parameters:

  • task_ref (Task, Symbol, String)

    Reference to the parent task

Returns:

  • (Boolean)

    true if the task was skipped



108
109
110
# File 'lib/hatchet/context.rb', line 108

def was_skipped?(task_ref)
  task_output(task_ref).nil?
end

#workerWorkerContext?

Access the worker context for worker-level operations

Returns:



204
205
206
# File 'lib/hatchet/context.rb', line 204

def worker
  @worker_context
end