Class: Hatchet::Clients::Grpc::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/clients/grpc/dispatcher.rb

Overview

gRPC client for the Hatchet Dispatcher service.

Handles worker registration, action listening, result reporting, heartbeats, and other dispatcher interactions.

Uses the generated Dispatcher::Stub from dispatcher.proto for v0 RPCs, and shares a gRPC channel provided by Hatchet::Connection.

Examples:

dispatcher = Dispatcher.new(config: hatchet_config, channel: channel)
response = dispatcher.register(name: "my-worker", ...)
dispatcher.listen(worker_id: response.worker_id) { |action| ... }

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config:, channel:) ⇒ Dispatcher

Returns a new instance of Dispatcher.



26
27
28
29
30
31
32
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 26

def initialize(config:, channel:)
  @config = config
  @logger = config.logger
  @channel = channel
  @stub = nil
  @worker_id = nil
end

Instance Attribute Details

#worker_idString? (readonly)

Returns Worker ID assigned after registration.

Parameters:

  • config (Hatchet::Config)

    The Hatchet configuration

  • channel (GRPC::Core::Channel)

    Shared gRPC channel

Returns:

  • (String, nil)

    Worker ID assigned after registration



24
25
26
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 24

def worker_id
  @worker_id
end

Instance Method Details

#closeObject

Close the gRPC channel.



258
259
260
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 258

def close
  @stub = nil
end

#get_versionString?

Fetch the engine version via the dispatcher’s “GetVersion“ RPC.

Returns the engine’s semantic version string (e.g. ““v0.80.0”“), or “nil“ if the engine is too old to support “GetVersion“ (i.e. the call comes back “UNIMPLEMENTED“).

Returns:

  • (String, nil)

    The engine semantic version, or nil when unsupported



248
249
250
251
252
253
254
255
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 248

def get_version
  ensure_connected!

  response = @stub.get_version(::GetVersionRequest.new, metadata: @config.)
  response.version
rescue ::GRPC::Unimplemented
  nil
end

#heartbeat(worker_id:) ⇒ HeartbeatResponse

Send a heartbeat to keep the worker registration alive.

Parameters:

  • worker_id (String)

    The worker ID

Returns:



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 114

def heartbeat(worker_id:)
  ensure_connected!

  now = Time.now
  timestamp = Google::Protobuf::Timestamp.new(
    seconds: now.to_i,
    nanos: now.nsec,
  )

  request = ::HeartbeatRequest.new(
    worker_id: worker_id,
    heartbeat_at: timestamp,
  )

  @stub.heartbeat(request, metadata: @config.)
end

#listen(worker_id:) ⇒ Enumerator

Listen for action assignments via gRPC server-streaming (ListenV2).

Returns an Enumerator of AssignedAction messages. The caller is responsible for iterating and handling reconnection.

Parameters:

  • worker_id (String)

    The registered worker ID

Returns:

  • (Enumerator)

    Stream of AssignedAction messages



103
104
105
106
107
108
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 103

def listen(worker_id:)
  ensure_connected!

  request = ::WorkerListenRequest.new(worker_id: worker_id)
  @stub.listen_v2(request, metadata: @config.)
end

#refresh_timeout(step_run_id:, timeout_seconds:) ⇒ RefreshTimeoutResponse

Refresh the timeout for a running task.

Parameters:

  • step_run_id (String)

    The task run external ID

  • timeout_seconds (Integer, String)

    New timeout increment (in seconds or as a duration string)

Returns:



175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 175

def refresh_timeout(step_run_id:, timeout_seconds:)
  ensure_connected!

  increment = timeout_seconds.is_a?(String) ? timeout_seconds : "#{timeout_seconds}s"

  request = ::RefreshTimeoutRequest.new(
    task_run_external_id: step_run_id,
    increment_timeout_by: increment,
  )

  @stub.refresh_timeout(request, metadata: @config.)
end

#register(name:, actions:, slots:, slot_config: nil, labels: {}) ⇒ WorkerRegisterResponse

Register a worker with the dispatcher.

Parameters:

  • name (String)

    Worker name

  • actions (Array<String>)

    List of action IDs this worker handles

  • slots (Integer)

    Number of concurrent task slots

  • slot_config (Hash{String => Integer}, nil) (defaults to: nil)

    Slot counts by slot type

  • labels (Hash) (defaults to: {})

    Worker labels (String keys, String or Integer values)

Returns:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 42

def register(name:, actions:, slots:, slot_config: nil, labels: {})
  ensure_connected!

  label_map = labels.each_with_object({}) do |(k, v), map|
    wl = if v.is_a?(Integer)
           ::WorkerLabels.new(int_value: v)
         else
           ::WorkerLabels.new(str_value: v.to_s)
         end
    map[k.to_s] = wl
  end

  runtime_info = ::RuntimeInfo.new(
    language: :RUBY,
    sdk_version: Hatchet::VERSION,
    language_version: RUBY_VERSION,
    os: RUBY_PLATFORM,
  )

  request_args = {
    worker_name: name,
    actions: actions,
    labels: label_map,
    runtime_info: runtime_info,
  }

  if slot_config && !slot_config.empty?
    request_args[:slot_config] = slot_config
  else
    request_args[:slots] = slots
  end

  request = ::WorkerRegisterRequest.new(**request_args)

  begin
    response = @stub.register(request, metadata: @config.)
  rescue ::GRPC::BadStatus => e
    request = ::WorkerRegisterRequest.new(
      worker_name: name,
      actions: actions,
      slots: slots,
      labels: label_map,
    )
    response = @stub.register(request, metadata: @config.)
    @logger.warn("Registered without runtime_info/slot_config after #{e.class}; " \
                 "engine may be too old for modern Ruby worker registration. " \
                 "Consider upgrading your Hatchet engine.")
  end

  @worker_id = response.worker_id
  @logger.info("Registered worker '#{name}' with #{actions.length} action(s), worker_id=#{response.worker_id}")
  response
end

#release_slot(step_run_id:) ⇒ ReleaseSlotResponse

Release a worker slot for a task.

Parameters:

  • step_run_id (String)

    The task run external ID

Returns:



192
193
194
195
196
197
198
199
200
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 192

def release_slot(step_run_id:)
  ensure_connected!

  request = ::ReleaseSlotRequest.new(
    task_run_external_id: step_run_id,
  )

  @stub.release_slot(request, metadata: @config.)
end

#send_step_action_event(action:, event_type:, payload: "{}", retry_count: nil, should_not_retry: nil) ⇒ ActionEventResponse

Send a step action event (completion/failure/started) back to the dispatcher.

Accepts the full action object (AssignedAction) so all StepActionEvent fields can be populated.

Parameters:

  • action (AssignedAction)

    The assigned action object

  • event_type (Symbol)

    Protobuf enum value (e.g., :STEP_EVENT_TYPE_COMPLETED)

  • payload (String) (defaults to: "{}")

    JSON-serialized event payload

  • retry_count (Integer, nil) (defaults to: nil)

    Current retry count

  • should_not_retry (Boolean, nil) (defaults to: nil)

    Whether to suppress further retries

Returns:



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 142

def send_step_action_event(action:, event_type:, payload: "{}", retry_count: nil, should_not_retry: nil)
  ensure_connected!

  now = Time.now
  timestamp = Google::Protobuf::Timestamp.new(
    seconds: now.to_i,
    nanos: now.nsec,
  )

  event_args = {
    worker_id: @worker_id || "",
    job_id: action.job_id,
    job_run_id: action.job_run_id,
    task_id: action.task_id,
    task_run_external_id: action.task_run_external_id,
    action_id: action.action_id,
    event_timestamp: timestamp,
    event_type: event_type,
    event_payload: payload,
  }

  event_args[:retry_count] = retry_count unless retry_count.nil?
  event_args[:should_not_retry] = should_not_retry unless should_not_retry.nil?

  request = ::StepActionEvent.new(**event_args)
  @stub.send_step_action_event(request, metadata: @config.)
end

#subscribe_to_workflow_runs(request_enum) ⇒ Enumerator<WorkflowRunEvent>

Open a bidirectional streaming subscription for workflow run events.

The caller provides an Enumerable (typically an Enumerator backed by a Queue) of SubscribeToWorkflowRunsRequest messages. The server streams back WorkflowRunEvent messages as workflow runs complete.

Parameters:

Returns:



235
236
237
238
239
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 235

def subscribe_to_workflow_runs(request_enum)
  ensure_connected!

  @stub.subscribe_to_workflow_runs(request_enum, metadata: @config.)
end

#upsert_worker_labels(worker_id:, labels:) ⇒ UpsertWorkerLabelsResponse

Update worker labels.

Parameters:

  • worker_id (String)

    The worker ID

  • labels (Hash)

    New labels to upsert (String keys, String/Integer values)

Returns:



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/hatchet/clients/grpc/dispatcher.rb', line 207

def upsert_worker_labels(worker_id:, labels:)
  ensure_connected!

  label_map = labels.each_with_object({}) do |(k, v), map|
    wl = if v.is_a?(Integer)
           ::WorkerLabels.new(int_value: v)
         else
           ::WorkerLabels.new(str_value: v.to_s)
         end
    map[k.to_s] = wl
  end

  request = ::UpsertWorkerLabelsRequest.new(
    worker_id: worker_id,
    labels: label_map,
  )

  @stub.upsert_worker_labels(request, metadata: @config.)
end