Class: Hatchet::AdminClient
- Inherits:
-
Object
- Object
- Hatchet::AdminClient
- Defined in:
- lib/hatchet-sdk.rb
Overview
Admin client for triggering and scheduling workflows.
Delegates to the gRPC admin client for actual RPC calls, while handling context variable propagation for parent-child workflow linking.
Instance Method Summary collapse
-
#initialize(client:) ⇒ AdminClient
constructor
A new instance of AdminClient.
-
#schedule_workflow(workflow, time, input: {}, options: nil) ⇒ Object
Schedule a workflow for future execution.
-
#trigger_workflow(workflow_or_task, input, options: nil) ⇒ Hash
Trigger a workflow run and wait for result.
-
#trigger_workflow_many(workflow_or_task, items, return_exceptions: false) ⇒ Array
Trigger many workflow runs and wait for all results.
-
#trigger_workflow_many_no_wait(workflow_or_task, items) ⇒ Array<WorkflowRunRef>
Trigger many workflow runs without waiting.
-
#trigger_workflow_no_wait(workflow_or_task, input, options: nil) ⇒ WorkflowRunRef
Trigger a workflow run without waiting for the result.
Constructor Details
#initialize(client:) ⇒ AdminClient
Returns a new instance of AdminClient.
320 321 322 323 |
# File 'lib/hatchet-sdk.rb', line 320 def initialize(client:) @client = client @spawn_indices = ContextVars::SpawnIndexTracker.new end |
Instance Method Details
#schedule_workflow(workflow, time, input: {}, options: nil) ⇒ Object
Schedule a workflow for future execution.
418 419 420 421 422 |
# File 'lib/hatchet-sdk.rb', line 418 def schedule_workflow(workflow, time, input: {}, options: nil) name = workflow.respond_to?(:name) ? workflow.name : workflow.to_s opts = () @client.admin_grpc.schedule_workflow(name, run_at: time, input: input, options: opts) end |
#trigger_workflow(workflow_or_task, input, options: nil) ⇒ Hash
Trigger a workflow run and wait for result.
331 332 333 334 |
# File 'lib/hatchet-sdk.rb', line 331 def trigger_workflow(workflow_or_task, input, options: nil) ref = trigger_workflow_no_wait(workflow_or_task, input, options: ) ref.result end |
#trigger_workflow_many(workflow_or_task, items, return_exceptions: false) ⇒ Array
Trigger many workflow runs and wait for all results.
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 |
# File 'lib/hatchet-sdk.rb', line 362 def trigger_workflow_many(workflow_or_task, items, return_exceptions: false) refs = trigger_workflow_many_no_wait(workflow_or_task, items) # Collect results concurrently using threads so that all subscriptions # are sent at once rather than serially waiting for each one. threads = refs.map do |ref| Thread.new do if return_exceptions begin ref.result rescue StandardError => e e end else ref.result end end end threads.map(&:value) end |
#trigger_workflow_many_no_wait(workflow_or_task, items) ⇒ Array<WorkflowRunRef>
Trigger many workflow runs without waiting.
Uses bulk gRPC triggering for efficiency (batched by 1000).
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/hatchet-sdk.rb', line 391 def trigger_workflow_many_no_wait(workflow_or_task, items) name = workflow_or_task.respond_to?(:name) ? workflow_or_task.name : workflow_or_task.to_s # Build trigger items with context vars for parent-child linking trigger_items = items.map do |item| input = item[:input] || {} opts = (item[:options]) { input: input, options: opts } end run_ids = @client.admin_grpc.bulk_trigger_workflow(name, trigger_items) run_ids.map do |run_id| WorkflowRunRef.new( workflow_run_id: run_id, client: @client, listener: @client.workflow_run_listener, ) end end |
#trigger_workflow_no_wait(workflow_or_task, input, options: nil) ⇒ WorkflowRunRef
Trigger a workflow run without waiting for the result.
342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/hatchet-sdk.rb', line 342 def trigger_workflow_no_wait(workflow_or_task, input, options: nil) name = workflow_or_task.respond_to?(:name) ? workflow_or_task.name : workflow_or_task.to_s # Merge user options with context vars for parent-child linking opts = () run_id = @client.admin_grpc.trigger_workflow(name, input: input, options: opts) WorkflowRunRef.new( workflow_run_id: run_id, client: @client, listener: @client.workflow_run_listener, ) end |