Class: Hatchet::DurableContext

Inherits:
Context
  • Object
show all
Defined in:
lib/hatchet/durable_context.rb

Overview

Extended context for durable tasks that supports sleep and event-waiting across task suspensions.

Durable tasks can be suspended and resumed by the Hatchet engine, allowing long-running workflows that survive process restarts.

Uses V1::V1Dispatcher for registering and listening for durable events via bidirectional gRPC streaming.

Examples:

Sleep for a duration

hatchet.durable_task(name: "my_task") do |input, ctx|
  ctx.sleep_for(duration: 60) # sleep for 60 seconds
end

Wait for an event

hatchet.durable_task(name: "my_task") do |input, ctx|
  result = ctx.wait_for("event", Hatchet::UserEventCondition.new(event_key: "user:update"))
end

Instance Attribute Summary collapse

Attributes inherited from Context

#additional_metadata, #attempt_number, #deps, #filter_payload, #priority, #retry_count, #step_run_id, #worker_id, #workflow_run_id

Instance Method Summary collapse

Methods inherited from Context

#cancel, #cancelled?, #get_task_run_error, #initialize, #log, #put_stream, #refresh_timeout, #release_slot, #task_output, #task_run_errors, #was_skipped?, #worker

Constructor Details

This class inherits a constructor from Hatchet::Context

Instance Attribute Details

#action_keyString?

Returns The action key used by the eviction manager to identify this run invocation.

Returns:

  • (String, nil)

    The action key used by the eviction manager to identify this run invocation.



31
32
33
# File 'lib/hatchet/durable_context.rb', line 31

def action_key
  @action_key
end

#durable_event_listenerHatchet::WorkerRuntime::DurableEventListener?

Returns New-style bidi listener. When set the context delegates through it instead of the legacy RegisterDurableEvent/ListenForDurableEvent path.

Returns:



36
37
38
# File 'lib/hatchet/durable_context.rb', line 36

def durable_event_listener
  @durable_event_listener
end

#engine_versionString?

Returns Engine version string advertised via GetVersion.

Returns:

  • (String, nil)

    Engine version string advertised via GetVersion.



42
43
44
# File 'lib/hatchet/durable_context.rb', line 42

def engine_version
  @engine_version
end

#eviction_managerHatchet::WorkerRuntime::DurableEviction::DurableEvictionManager?



27
28
29
# File 'lib/hatchet/durable_context.rb', line 27

def eviction_manager
  @eviction_manager
end

#invocation_countInteger

Returns Durable-task invocation count (>= 1).

Returns:

  • (Integer)

    Durable-task invocation count (>= 1).



39
40
41
# File 'lib/hatchet/durable_context.rb', line 39

def invocation_count
  @invocation_count
end

Instance Method Details

#sleep_for(duration:, label: nil) ⇒ Hash?

Sleep for a specified duration. The task is suspended and resumed by the engine after the duration expires.

Delegates to #wait_for with a SleepCondition so that both sleeps and event waits share a single registration / eviction path.

Parameters:

  • duration (Integer, String)

    Duration in seconds, or a duration string (e.g. “60s”)

  • label (String, nil) (defaults to: nil)

    Optional wait label shown in durable event logs.

Returns:

  • (Hash, nil)

    Result from the sleep event



53
54
55
56
57
58
59
60
# File 'lib/hatchet/durable_context.rb', line 53

def sleep_for(duration:, label: nil)
  duration_str = duration.is_a?(String) ? duration : "#{duration}s"
  duration_value = duration.is_a?(String) ? duration : duration.to_i
  wait_index = increment_wait_index
  signal_key = "sleep:#{duration_str}-#{wait_index}"

  wait_for(signal_key, Hatchet::SleepCondition.new(duration_value), label: label)
end

#wait_for(key, condition, label: nil) ⇒ Hash

Wait for a condition to be met (event or sleep). The task is suspended and resumed when the condition is satisfied.

Register the durable wait with “send_event“ first, then start eviction tracking only while blocked on “wait_for_callback“.

Parameters:

  • key (String)

    A unique key for this wait operation

  • condition (Object)

    The condition to wait for (UserEventCondition, SleepCondition, Hash, etc.)

  • label (String, nil) (defaults to: nil)

    Optional wait label shown in durable event logs.

Returns:

  • (Hash)

    Result from the wait, including which condition was satisfied



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/hatchet/durable_context.rb', line 72

def wait_for(key, condition, label: nil)
  conditions = build_durable_conditions(key, condition)

  if supports_durable_eviction?
    invocation = @invocation_count || 1

    event = Hatchet::WorkerRuntime::DurableEventListener::WaitForEvent.new(
      wait_for_conditions: conditions,
      label: label,
    )

    ack = @durable_event_listener.send_event(@step_run_id, invocation, event)

    with_eviction_wait(wait_kind: "wait_for", resource_id: key) do
      result = @durable_event_listener.wait_for_callback(
        @step_run_id,
        invocation,
        ack[:branch_id],
        ack[:node_id],
      )

      result[:payload] || {}
    end
  else
    with_eviction_wait(wait_kind: "wait_for", resource_id: key) do
      legacy_wait_for(key, conditions)
    end
  end
end