Class: Hatchet::AdminClient

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet-sdk.rb

Overview

Admin client for triggering and scheduling workflows.

Delegates to the gRPC admin client for actual RPC calls, while handling context variable propagation for parent-child workflow linking.

Instance Method Summary collapse

Constructor Details

#initialize(client:) ⇒ AdminClient

Returns a new instance of AdminClient.



320
321
322
323
# File 'lib/hatchet-sdk.rb', line 320

def initialize(client:)
  @client = client
  @spawn_indices = ContextVars::SpawnIndexTracker.new
end

Instance Method Details

#schedule_workflow(workflow, time, input: {}, options: nil) ⇒ Object

Schedule a workflow for future execution.

Parameters:

  • workflow (Workflow, Task, String)

    The workflow to schedule

  • time (Time)

    When to execute

  • input (Hash) (defaults to: {})

    Workflow input

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Schedule options

Returns:

  • (Object)

    Schedule response



418
419
420
421
422
# File 'lib/hatchet-sdk.rb', line 418

def schedule_workflow(workflow, time, input: {}, options: nil)
  name = workflow.respond_to?(:name) ? workflow.name : workflow.to_s
  opts = build_trigger_options(options)
  @client.admin_grpc.schedule_workflow(name, run_at: time, input: input, options: opts)
end

#trigger_workflow(workflow_or_task, input, options: nil) ⇒ Hash

Trigger a workflow run and wait for result.

Parameters:

  • workflow_or_task (Workflow, Task, String)

    The workflow or task to trigger

  • input (Hash)

    Workflow input

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:

  • (Hash)

    The workflow run result



331
332
333
334
# File 'lib/hatchet-sdk.rb', line 331

def trigger_workflow(workflow_or_task, input, options: nil)
  ref = trigger_workflow_no_wait(workflow_or_task, input, options: options)
  ref.result
end

#trigger_workflow_many(workflow_or_task, items, return_exceptions: false) ⇒ Array

Trigger many workflow runs and wait for all results.

Parameters:

  • workflow_or_task (Workflow, Task, String)

    The workflow or task to trigger

  • items (Array<Hash>)

    Array of { input:, options: } items

  • return_exceptions (Boolean) (defaults to: false)

    Return exceptions instead of raising

Returns:

  • (Array)

    Results or exceptions



362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/hatchet-sdk.rb', line 362

def trigger_workflow_many(workflow_or_task, items, return_exceptions: false)
  refs = trigger_workflow_many_no_wait(workflow_or_task, items)

  # Collect results concurrently using threads so that all subscriptions
  # are sent at once rather than serially waiting for each one.
  threads = refs.map do |ref|
    Thread.new do
      if return_exceptions
        begin
          ref.result
        rescue StandardError => e
          e
        end
      else
        ref.result
      end
    end
  end

  threads.map(&:value)
end

#trigger_workflow_many_no_wait(workflow_or_task, items) ⇒ Array<WorkflowRunRef>

Trigger many workflow runs without waiting.

Uses bulk gRPC triggering for efficiency (batched by 1000).

Parameters:

  • workflow_or_task (Workflow, Task, String)

    The workflow or task to trigger

  • items (Array<Hash>)

    Array of { input:, options: } items

Returns:



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/hatchet-sdk.rb', line 391

def trigger_workflow_many_no_wait(workflow_or_task, items)
  name = workflow_or_task.respond_to?(:name) ? workflow_or_task.name : workflow_or_task.to_s

  # Build trigger items with context vars for parent-child linking
  trigger_items = items.map do |item|
    input = item[:input] || {}
    opts = build_trigger_options(item[:options])
    { input: input, options: opts }
  end

  run_ids = @client.admin_grpc.bulk_trigger_workflow(name, trigger_items)
  run_ids.map do |run_id|
    WorkflowRunRef.new(
      workflow_run_id: run_id,
      client: @client,
      listener: @client.workflow_run_listener,
    )
  end
end

#trigger_workflow_no_wait(workflow_or_task, input, options: nil) ⇒ WorkflowRunRef

Trigger a workflow run without waiting for the result.

Parameters:

  • workflow_or_task (Workflow, Task, String)

    The workflow or task to trigger

  • input (Hash)

    Workflow input

  • options (TriggerWorkflowOptions, nil) (defaults to: nil)

    Trigger options

Returns:



342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/hatchet-sdk.rb', line 342

def trigger_workflow_no_wait(workflow_or_task, input, options: nil)
  name = workflow_or_task.respond_to?(:name) ? workflow_or_task.name : workflow_or_task.to_s

  # Merge user options with context vars for parent-child linking
  opts = build_trigger_options(options)

  run_id = @client.admin_grpc.trigger_workflow(name, input: input, options: opts)
  WorkflowRunRef.new(
    workflow_run_id: run_id,
    client: @client,
    listener: @client.workflow_run_listener,
  )
end