Class: Hatchet::WorkflowRunListener
- Inherits:
-
Object
- Object
- Hatchet::WorkflowRunListener
- Defined in:
- lib/hatchet/worker/workflow_run_listener.rb
Overview
Thread-safe pooled gRPC listener for workflow run completion events.
Maintains a single bidirectional gRPC stream to ‘Dispatcher.SubscribeToWorkflowRuns`. Multiple callers share the stream; each subscription sends a request on the outgoing side and blocks until the matching `WorkflowRunEvent` arrives on the incoming side.
Modeled on the Python SDK’s ‘PooledWorkflowRunListener`.
Constant Summary collapse
- RETRY_INTERVAL =
seconds between reconnect attempts
3- MAX_RETRIES =
5- DEDUPE_MESSAGE =
"DUPLICATE_WORKFLOW_RUN"
Instance Method Summary collapse
-
#initialize(config:, channel:) ⇒ WorkflowRunListener
constructor
A new instance of WorkflowRunListener.
-
#result(workflow_run_id) ⇒ Hash
Subscribe to a workflow run and block until it finishes.
-
#shutdown ⇒ Object
Stop the listener and clean up resources.
Constructor Details
#initialize(config:, channel:) ⇒ WorkflowRunListener
Returns a new instance of WorkflowRunListener.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/hatchet/worker/workflow_run_listener.rb', line 25 def initialize(config:, channel:) @config = config @channel = channel @logger = config.logger # Outgoing request queue. The Enumerator fed to the bidi call # pulls from this queue. @request_queue = Queue.new # Subscription management (protected by @mu) @mu = Mutex.new # workflow_run_id -> Array<Queue> (subscriber queues) @subscriptions = Hash.new { |h, k| h[k] = [] } # Background reader thread (lazy-started) @reader_thread = nil @started = false end |
Instance Method Details
#result(workflow_run_id) ⇒ Hash
Subscribe to a workflow run and block until it finishes.
Returns the result hash keyed by task readable_id, e.g.
{"step1" => {"value" => 42}, "step2" => {...}}
Blocks indefinitely until the run reaches a terminal state.
55 56 57 58 |
# File 'lib/hatchet/worker/workflow_run_listener.rb', line 55 def result(workflow_run_id) event = subscribe(workflow_run_id) parse_event(event) end |
#shutdown ⇒ Object
Stop the listener and clean up resources.
61 62 63 64 |
# File 'lib/hatchet/worker/workflow_run_listener.rb', line 61 def shutdown @request_queue.close @reader_thread&.join(5) end |