Class: Hatchet::WorkerRuntime::DurableEviction::DurableEvictionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/worker/durable_eviction/manager.rb

Overview

Orchestrates durable-task eviction.

Runs a background thread that periodically selects an eviction candidate from the cache, asks the server to evict it, and then interrupts the local task thread.

Mirrors :class:‘hatchet_sdk.worker.durable_eviction.manager.DurableEvictionManager`.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(durable_slots:, cancel_local:, request_eviction_with_ack:, config: DEFAULT_DURABLE_EVICTION_CONFIG, cache: nil, logger: nil) ⇒ DurableEvictionManager

Returns a new instance of DurableEvictionManager.

Parameters:

  • durable_slots (Integer)
  • cancel_local (Proc)

    Called with the action key when the manager decides to evict a local run (invoked after the server ACK).

  • request_eviction_with_ack (Proc)

    Called with (action_key, DurableRunRecord) to send the eviction RPC to the server and block until acknowledged.

  • config (DurableEvictionConfig) (defaults to: DEFAULT_DURABLE_EVICTION_CONFIG)
  • cache (DurableEvictionCache, nil) (defaults to: nil)
  • logger (Logger, nil) (defaults to: nil)


49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 49

def initialize(
  durable_slots:,
  cancel_local:,
  request_eviction_with_ack:,
  config: DEFAULT_DURABLE_EVICTION_CONFIG,
  cache: nil,
  logger: nil
)
  @durable_slots = durable_slots
  @cancel_local = cancel_local
  @request_eviction_with_ack = request_eviction_with_ack
  @config = config
  @cache = cache || DurableEvictionCache.new
  @logger = logger

  @thread = nil
  @tick_monitor = Monitor.new
  @stopped = false
end

Instance Attribute Details

#cacheDurableEvictionCache (readonly)



39
40
41
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 39

def cache
  @cache
end

Instance Method Details

#evict_all_waitingInteger

Evict every currently-waiting durable run. Used during graceful shutdown.

Returns:

  • (Integer)

    number of runs evicted



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 135

def evict_all_waiting
  stop

  waiting = @cache.all_waiting
  evicted = 0

  waiting.each do |rec|
    rec.eviction_reason = DurableEvictionCache.build_eviction_reason(
      EvictionCause::WORKER_SHUTDOWN, rec,
    )

    @logger&.debug(
      "DurableEvictionManager: shutdown-evicting durable run " \
      "step_run_id=#{rec.step_run_id} wait_kind=#{rec.wait_kind} " \
      "resource_id=#{rec.wait_resource_id}",
    )

    begin
      @request_eviction_with_ack.call(rec.key, rec)
    rescue StandardError => e
      @logger&.error(
        "DurableEvictionManager: failed to send eviction for " \
        "step_run_id=#{rec.step_run_id}: #{e.class}: #{e.message}",
      )
    end

    # Always cancel locally even if the server ACK failed, so the
    # future settles and shutdown doesn't hang.
    evict_run(rec.key)
    evicted += 1
  end

  evicted
end

#handle_server_eviction(step_run_id, invocation_count) ⇒ Object

Handle a server-initiated eviction notification for a stale invocation.



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 118

def handle_server_eviction(step_run_id, invocation_count)
  key = @cache.find_key_by_step_run_id(step_run_id)
  return unless key

  rec = @cache.get(key)
  return if rec && rec.invocation_count != invocation_count

  @logger&.info(
    "DurableEvictionManager: server-initiated eviction for " \
    "step_run_id=#{step_run_id} invocation_count=#{invocation_count}",
  )
  evict_run(key)
end

#mark_active(key) ⇒ Object

Mark the run as active (decrements the wait counter).



113
114
115
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 113

def mark_active(key)
  @cache.mark_active(key, now: now)
end

#mark_waiting(key, wait_kind:, resource_id:) ⇒ Object

Mark the run as waiting (increments the wait counter).



108
109
110
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 108

def mark_waiting(key, wait_kind:, resource_id:)
  @cache.mark_waiting(key, now: now, wait_kind: wait_kind, resource_id: resource_id)
end

#register_run(key, step_run_id:, invocation_count:, eviction_policy:) ⇒ Object

Register a new durable run invocation. Takes the current time from the system clock.



92
93
94
95
96
97
98
99
100
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 92

def register_run(key, step_run_id:, invocation_count:, eviction_policy:)
  @cache.register_run(
    key,
    step_run_id: step_run_id,
    invocation_count: invocation_count,
    now: now,
    eviction_policy: eviction_policy,
  )
end

#startObject

Start the background eviction ticker. Idempotent.



70
71
72
73
74
75
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 70

def start
  return if @thread&.alive?

  @stopped = false
  @thread = Thread.new { run_loop }
end

#stopObject

Signal the background thread to stop. Does not join.



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 78

def stop
  @stopped = true
  thread = @thread
  return unless thread&.alive?

  begin
    thread.wakeup
  rescue ThreadError
    nil
  end
end

#unregister_run(key) ⇒ Object

Unregister a durable run invocation.



103
104
105
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 103

def unregister_run(key)
  @cache.unregister_run(key)
end