Class: Hatchet::DurableContext
- Defined in:
- lib/hatchet/durable_context.rb
Overview
Extended context for durable tasks that supports sleep and event-waiting across task suspensions.
Durable tasks can be suspended and resumed by the Hatchet engine, allowing long-running workflows that survive process restarts.
Uses V1::V1Dispatcher for registering and listening for durable events via bidirectional gRPC streaming.
Instance Attribute Summary collapse
-
#action_key ⇒ String?
The action key used by the eviction manager to identify this run invocation.
-
#durable_event_listener ⇒ Hatchet::WorkerRuntime::DurableEventListener?
New-style bidi listener.
-
#engine_version ⇒ String?
Engine version string advertised via GetVersion.
- #eviction_manager ⇒ Hatchet::WorkerRuntime::DurableEviction::DurableEvictionManager?
-
#invocation_count ⇒ Integer
Durable-task invocation count (>= 1).
Attributes inherited from Context
#additional_metadata, #attempt_number, #deps, #filter_payload, #priority, #retry_count, #step_run_id, #worker_id, #workflow_run_id
Instance Method Summary collapse
-
#sleep_for(duration:, label: nil) ⇒ Hash?
Sleep for a specified duration.
-
#wait_for(key, condition, label: nil) ⇒ Hash
Wait for a condition to be met (event or sleep).
Methods inherited from Context
#cancel, #cancelled?, #get_task_run_error, #initialize, #log, #put_stream, #refresh_timeout, #release_slot, #task_output, #task_run_errors, #was_skipped?, #worker
Constructor Details
This class inherits a constructor from Hatchet::Context
Instance Attribute Details
#action_key ⇒ String?
Returns The action key used by the eviction manager to identify this run invocation.
31 32 33 |
# File 'lib/hatchet/durable_context.rb', line 31 def action_key @action_key end |
#durable_event_listener ⇒ Hatchet::WorkerRuntime::DurableEventListener?
Returns New-style bidi listener. When set the context delegates through it instead of the legacy RegisterDurableEvent/ListenForDurableEvent path.
36 37 38 |
# File 'lib/hatchet/durable_context.rb', line 36 def durable_event_listener @durable_event_listener end |
#engine_version ⇒ String?
Returns Engine version string advertised via GetVersion.
42 43 44 |
# File 'lib/hatchet/durable_context.rb', line 42 def engine_version @engine_version end |
#eviction_manager ⇒ Hatchet::WorkerRuntime::DurableEviction::DurableEvictionManager?
27 28 29 |
# File 'lib/hatchet/durable_context.rb', line 27 def eviction_manager @eviction_manager end |
#invocation_count ⇒ Integer
Returns Durable-task invocation count (>= 1).
39 40 41 |
# File 'lib/hatchet/durable_context.rb', line 39 def invocation_count @invocation_count end |
Instance Method Details
#sleep_for(duration:, label: nil) ⇒ Hash?
Sleep for a specified duration. The task is suspended and resumed by the engine after the duration expires.
Delegates to #wait_for with a SleepCondition so that both sleeps and event waits share a single registration / eviction path.
53 54 55 56 57 58 59 60 |
# File 'lib/hatchet/durable_context.rb', line 53 def sleep_for(duration:, label: nil) duration_str = duration.is_a?(String) ? duration : "#{duration}s" duration_value = duration.is_a?(String) ? duration : duration.to_i wait_index = increment_wait_index signal_key = "sleep:#{duration_str}-#{wait_index}" wait_for(signal_key, Hatchet::SleepCondition.new(duration_value), label: label) end |
#wait_for(key, condition, label: nil) ⇒ Hash
Wait for a condition to be met (event or sleep). The task is suspended and resumed when the condition is satisfied.
Register the durable wait with “send_event“ first, then start eviction tracking only while blocked on “wait_for_callback“.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/hatchet/durable_context.rb', line 72 def wait_for(key, condition, label: nil) conditions = build_durable_conditions(key, condition) if supports_durable_eviction? invocation = @invocation_count || 1 event = Hatchet::WorkerRuntime::DurableEventListener::WaitForEvent.new( wait_for_conditions: conditions, label: label, ) ack = @durable_event_listener.send_event(@step_run_id, invocation, event) with_eviction_wait(wait_kind: "wait_for", resource_id: key) do result = @durable_event_listener.wait_for_callback( @step_run_id, invocation, ack[:branch_id], ack[:node_id], ) result[:payload] || {} end else with_eviction_wait(wait_kind: "wait_for", resource_id: key) do legacy_wait_for(key, conditions) end end end |