Class: Hatchet::WorkflowRunListener

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/worker/workflow_run_listener.rb

Overview

Thread-safe pooled gRPC listener for workflow run completion events.

Maintains a single bidirectional gRPC stream to ‘Dispatcher.SubscribeToWorkflowRuns`. Multiple callers share the stream; each subscription sends a request on the outgoing side and blocks until the matching `WorkflowRunEvent` arrives on the incoming side.

Modeled on the Python SDK’s ‘PooledWorkflowRunListener`.

Examples:

listener = WorkflowRunListener.new(config: config, channel: channel)
result = listener.result("workflow-run-id-123")
# => {"my_task" => {"value" => 42}}

Constant Summary collapse

RETRY_INTERVAL =

seconds between reconnect attempts

3
MAX_RETRIES =
5
DEDUPE_MESSAGE =
"DUPLICATE_WORKFLOW_RUN"

Instance Method Summary collapse

Constructor Details

#initialize(config:, channel:) ⇒ WorkflowRunListener

Returns a new instance of WorkflowRunListener.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/hatchet/worker/workflow_run_listener.rb', line 25

def initialize(config:, channel:)
  @config  = config
  @channel = channel
  @logger  = config.logger

  # Outgoing request queue. The Enumerator fed to the bidi call
  # pulls from this queue.
  @request_queue = Queue.new

  # Subscription management (protected by @mu)
  @mu = Mutex.new
  # workflow_run_id -> Array<Queue>  (subscriber queues)
  @subscriptions = Hash.new { |h, k| h[k] = [] }

  # Background reader thread (lazy-started)
  @reader_thread = nil
  @started = false
end

Instance Method Details

#result(workflow_run_id) ⇒ Hash

Subscribe to a workflow run and block until it finishes.

Returns the result hash keyed by task readable_id, e.g.

{"step1" => {"value" => 42}, "step2" => {...}}

Blocks indefinitely until the run reaches a terminal state.

Parameters:

  • workflow_run_id (String)

Returns:

  • (Hash)

    Task results keyed by task name

Raises:



55
56
57
58
# File 'lib/hatchet/worker/workflow_run_listener.rb', line 55

def result(workflow_run_id)
  event = subscribe(workflow_run_id)
  parse_event(event)
end

#shutdownObject

Stop the listener and clean up resources.



61
62
63
64
# File 'lib/hatchet/worker/workflow_run_listener.rb', line 61

def shutdown
  @request_queue.close
  @reader_thread&.join(5)
end