Class: Hatchet::Clients::Grpc::Admin

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/clients/grpc/admin.rb

Overview

gRPC client for the Hatchet Admin service (workflow registration & triggering).

Uses two stubs on the shared channel:

  • V1::AdminService::Stub (v1) for: PutWorkflow, GetRunDetails, CancelTasks, ReplayTasks

  • WorkflowService::Stub (v0) for: TriggerWorkflow, BulkTriggerWorkflow, ScheduleWorkflow, PutRateLimit

The v0 WorkflowService is retained for triggering because it supports parent-child linking fields that the v1 TriggerWorkflowRun does not expose.

Examples:

admin = Admin.new(config: hatchet_config, channel: channel)
admin.put_workflow(workflow.to_proto(config))
ref = admin.trigger_workflow("MyWorkflow", input: { "key" => "value" })

Constant Summary collapse

BULK_TRIGGER_BATCH_SIZE =
1000
COMPARATOR_MAP =
{
  equal: :EQUAL, not_equal: :NOT_EQUAL,
  greater_than: :GREATER_THAN, greater_than_or_equal: :GREATER_THAN_OR_EQUAL,
  less_than: :LESS_THAN, less_than_or_equal: :LESS_THAN_OR_EQUAL,
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(config:, channel:) ⇒ Admin

Returns a new instance of Admin.

Parameters:

  • config (Hatchet::Config)

    The Hatchet configuration

  • channel (GRPC::Core::Channel)

    Shared gRPC channel



27
28
29
30
31
32
33
# File 'lib/hatchet/clients/grpc/admin.rb', line 27

def initialize(config:, channel:)
  @config = config
  @logger = config.logger
  @channel = channel
  @v0_stub = nil
  @v1_stub = nil
end

Instance Method Details

#bulk_trigger_workflow(workflow_name, items) ⇒ Array<String>

Trigger multiple workflow runs in bulk via v0 WorkflowService.

Automatically batches requests in groups of 1000.

Parameters:

  • workflow_name (String)

    The workflow name (will be namespaced)

  • items (Array<Hash>)

    Array of { input:, options: } items

Returns:

  • (Array<String>)

    Array of workflow run IDs



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/hatchet/clients/grpc/admin.rb', line 111

def bulk_trigger_workflow(workflow_name, items)
  ensure_connected!

  name = @config.apply_namespace(workflow_name)

  requests = items.map do |item|
    input = item[:input] || {}
    opts = item[:options] || {}

    request_args = {
      name: name,
      input: JSON.generate(input),
    }

    request_args[:parent_id] = opts[:parent_id] if opts[:parent_id]
    request_args[:parent_task_run_external_id] = opts[:parent_task_run_external_id] if opts[:parent_task_run_external_id]
    request_args[:child_index] = opts[:child_index] if opts[:child_index]
    request_args[:child_key] = opts[:child_key] if opts[:child_key]
    request_args[:desired_worker_id] = opts[:desired_worker_id] if opts[:desired_worker_id]
    request_args[:priority] = opts[:priority] if opts[:priority]

    if opts[:additional_metadata]
      request_args[:additional_metadata] = if opts[:additional_metadata].is_a?(String)
                                             opts[:additional_metadata]
                                           else
                                             JSON.generate(opts[:additional_metadata])
                                           end
    end

    request_args[:desired_worker_labels] = build_trigger_worker_labels(opts[:desired_worker_labels]) if opts[:desired_worker_labels]

    ::V1::TriggerWorkflowRequest.new(**request_args)
  end

  # Batch in groups of BULK_TRIGGER_BATCH_SIZE
  all_run_ids = []
  begin
    requests.each_slice(BULK_TRIGGER_BATCH_SIZE) do |batch|
      bulk_request = ::BulkTriggerWorkflowRequest.new(workflows: batch)
      response = @v0_stub.bulk_trigger_workflow(bulk_request, metadata: @config.)
      all_run_ids.concat(response.workflow_run_ids.to_a)
    end
  rescue ::GRPC::ResourceExhausted => e
    raise ResourceExhaustedError, e.message
  rescue ::GRPC::BadStatus => e
    raise Error, "gRPC error triggering bulk workflow: #{e.class}: #{e.message}"
  end

  all_run_ids
end

#closeObject

Close the connection.



243
244
245
246
# File 'lib/hatchet/clients/grpc/admin.rb', line 243

def close
  @v0_stub = nil
  @v1_stub = nil
end

#get_run_details(external_id:) ⇒ V1::GetRunDetailsResponse

Get run details via v1 AdminService.

Parameters:

  • external_id (String)

    The workflow run external ID

Returns:



217
218
219
220
221
222
# File 'lib/hatchet/clients/grpc/admin.rb', line 217

def get_run_details(external_id:)
  ensure_connected!

  request = ::V1::GetRunDetailsRequest.new(external_id: external_id)
  @v1_stub.get_run_details(request, metadata: @config.)
end

#put_rate_limit(key:, limit:, duration:) ⇒ PutRateLimitResponse

Put a rate limit via v0 WorkflowService.

Parameters:

  • key (String)

    Rate limit key

  • limit (Integer)

    Rate limit value

  • duration (Symbol)

    Rate limit duration enum

Returns:



230
231
232
233
234
235
236
237
238
239
240
# File 'lib/hatchet/clients/grpc/admin.rb', line 230

def put_rate_limit(key:, limit:, duration:)
  ensure_connected!

  request = ::PutRateLimitRequest.new(
    key: key,
    limit: limit,
    duration: duration,
  )

  @v0_stub.put_rate_limit(request, metadata: @config.)
end

#put_workflow(workflow_proto) ⇒ V1::CreateWorkflowVersionResponse

Register a workflow definition with the server via v1 AdminService.

Parameters:

Returns:



39
40
41
42
43
44
45
# File 'lib/hatchet/clients/grpc/admin.rb', line 39

def put_workflow(workflow_proto)
  ensure_connected!

  response = @v1_stub.put_workflow(workflow_proto, metadata: @config.)
  @logger.debug("Registered workflow: #{workflow_proto.name}")
  response
end

#schedule_workflow(workflow_name, run_at:, input: {}, options: {}) ⇒ WorkflowVersion

Schedule a workflow for future execution via v0 WorkflowService.

Parameters:

  • workflow_name (String)

    The workflow name (will be namespaced)

  • run_at (Time)

    When to run

  • input (Hash) (defaults to: {})

    Workflow input

  • options (Hash) (defaults to: {})

    Trigger options

Returns:

Raises:



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/hatchet/clients/grpc/admin.rb', line 170

def schedule_workflow(workflow_name, run_at:, input: {}, options: {})
  ensure_connected!

  name = @config.apply_namespace(workflow_name)

  schedule_timestamp = Google::Protobuf::Timestamp.new(
    seconds: run_at.to_i,
    nanos: run_at.respond_to?(:nsec) ? run_at.nsec : 0,
  )

  request_args = {
    name: name,
    schedules: [schedule_timestamp],
    input: JSON.generate(input),
  }

  request_args[:parent_id] = options[:parent_id] if options[:parent_id]
  request_args[:parent_task_run_external_id] = options[:parent_task_run_external_id] if options[:parent_task_run_external_id]
  request_args[:child_index] = options[:child_index] if options[:child_index]
  request_args[:child_key] = options[:child_key] if options[:child_key]
  request_args[:priority] = options[:priority] if options[:priority]

  if options[:additional_metadata]
    request_args[:additional_metadata] = if options[:additional_metadata].is_a?(String)
                                           options[:additional_metadata]
                                         else
                                           JSON.generate(options[:additional_metadata])
                                         end
  end

  request = ::ScheduleWorkflowRequest.new(**request_args)

  begin
    @v0_stub.schedule_workflow(request, metadata: @config.)
  rescue ::GRPC::AlreadyExists => e
    raise DedupeViolationError, "Deduplication violation: #{e.message}"
  rescue ::GRPC::ResourceExhausted => e
    raise ResourceExhaustedError, e.message
  rescue ::GRPC::BadStatus => e
    raise Error, "gRPC error scheduling workflow: #{e.class}: #{e.message}"
  end
end

#trigger_workflow(workflow_name, input: {}, options: {}) ⇒ String

Trigger a workflow run via v0 WorkflowService.

Parameters:

  • workflow_name (String)

    The workflow name (will be namespaced)

  • input (Hash) (defaults to: {})

    Workflow input

  • options (Hash) (defaults to: {})

    Trigger options

Options Hash (options:):

  • :parent_id (String)

    Parent workflow run ID

  • :parent_task_run_external_id (String)

    Parent step run ID

  • :child_index (Integer)

    Child workflow index

  • :child_key (String)

    Child workflow key

  • :additional_metadata (Hash)

    Additional metadata

  • :desired_worker_id (String)

    Desired worker for sticky dispatch

  • :priority (Integer)

    Priority value

Returns:

  • (String)

    The workflow run ID

Raises:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/hatchet/clients/grpc/admin.rb', line 61

def trigger_workflow(workflow_name, input: {}, options: {})
  ensure_connected!

  name = @config.apply_namespace(workflow_name)

  request_args = {
    name: name,
    input: JSON.generate(input),
  }

  request_args[:parent_id] = options[:parent_id] if options[:parent_id]
  request_args[:parent_task_run_external_id] = options[:parent_task_run_external_id] if options[:parent_task_run_external_id]
  request_args[:child_index] = options[:child_index] if options[:child_index]
  request_args[:child_key] = options[:child_key] if options[:child_key]
  request_args[:desired_worker_id] = options[:desired_worker_id] if options[:desired_worker_id]
  request_args[:priority] = options[:priority] if options[:priority]

  if options[:additional_metadata]
    request_args[:additional_metadata] = if options[:additional_metadata].is_a?(String)
                                           options[:additional_metadata]
                                         else
                                           JSON.generate(options[:additional_metadata])
                                         end
  end

  if options[:desired_worker_labels]
    request_args[:desired_worker_labels] = build_trigger_worker_labels(options[:desired_worker_labels])
  end

  request = ::V1::TriggerWorkflowRequest.new(**request_args)

  begin
    response = @v0_stub.trigger_workflow(request, metadata: @config.)
    response.workflow_run_id
  rescue ::GRPC::AlreadyExists => e
    raise DedupeViolationError, "Deduplication violation: #{e.message}"
  rescue ::GRPC::ResourceExhausted => e
    raise ResourceExhaustedError, e.message
  rescue ::GRPC::BadStatus => e
    raise Error, "gRPC error triggering workflow: #{e.class}: #{e.message}"
  end
end