Class: Hatchet::WorkerRuntime::DurableEviction::DurableEvictionManager
- Inherits:
-
Object
- Object
- Hatchet::WorkerRuntime::DurableEviction::DurableEvictionManager
- Defined in:
- lib/hatchet/worker/durable_eviction/manager.rb
Overview
Orchestrates durable-task eviction.
Runs a background thread that periodically selects an eviction candidate from the cache, asks the server to evict it, and then interrupts the local task thread.
Mirrors :class:‘hatchet_sdk.worker.durable_eviction.manager.DurableEvictionManager`.
Instance Attribute Summary collapse
- #cache ⇒ DurableEvictionCache readonly
Instance Method Summary collapse
-
#evict_all_waiting ⇒ Integer
Evict every currently-waiting durable run.
-
#handle_server_eviction(step_run_id, invocation_count) ⇒ Object
Handle a server-initiated eviction notification for a stale invocation.
-
#initialize(durable_slots:, cancel_local:, request_eviction_with_ack:, config: DEFAULT_DURABLE_EVICTION_CONFIG, cache: nil, logger: nil) ⇒ DurableEvictionManager
constructor
A new instance of DurableEvictionManager.
-
#mark_active(key) ⇒ Object
Mark the run as active (decrements the wait counter).
-
#mark_waiting(key, wait_kind:, resource_id:) ⇒ Object
Mark the run as waiting (increments the wait counter).
-
#register_run(key, step_run_id:, invocation_count:, eviction_policy:) ⇒ Object
Register a new durable run invocation.
-
#start ⇒ Object
Start the background eviction ticker.
-
#stop ⇒ Object
Signal the background thread to stop.
-
#unregister_run(key) ⇒ Object
Unregister a durable run invocation.
Constructor Details
#initialize(durable_slots:, cancel_local:, request_eviction_with_ack:, config: DEFAULT_DURABLE_EVICTION_CONFIG, cache: nil, logger: nil) ⇒ DurableEvictionManager
Returns a new instance of DurableEvictionManager.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 49 def initialize( durable_slots:, cancel_local:, request_eviction_with_ack:, config: DEFAULT_DURABLE_EVICTION_CONFIG, cache: nil, logger: nil ) @durable_slots = durable_slots @cancel_local = cancel_local @request_eviction_with_ack = request_eviction_with_ack @config = config @cache = cache || DurableEvictionCache.new @logger = logger @thread = nil @tick_monitor = Monitor.new @stopped = false end |
Instance Attribute Details
#cache ⇒ DurableEvictionCache (readonly)
39 40 41 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 39 def cache @cache end |
Instance Method Details
#evict_all_waiting ⇒ Integer
Evict every currently-waiting durable run. Used during graceful shutdown.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 135 def evict_all_waiting stop waiting = @cache.all_waiting evicted = 0 waiting.each do |rec| rec.eviction_reason = DurableEvictionCache.build_eviction_reason( EvictionCause::WORKER_SHUTDOWN, rec, ) @logger&.debug( "DurableEvictionManager: shutdown-evicting durable run " \ "step_run_id=#{rec.step_run_id} wait_kind=#{rec.wait_kind} " \ "resource_id=#{rec.wait_resource_id}", ) begin @request_eviction_with_ack.call(rec.key, rec) rescue StandardError => e @logger&.error( "DurableEvictionManager: failed to send eviction for " \ "step_run_id=#{rec.step_run_id}: #{e.class}: #{e.}", ) end # Always cancel locally even if the server ACK failed, so the # future settles and shutdown doesn't hang. evict_run(rec.key) evicted += 1 end evicted end |
#handle_server_eviction(step_run_id, invocation_count) ⇒ Object
Handle a server-initiated eviction notification for a stale invocation.
118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 118 def handle_server_eviction(step_run_id, invocation_count) key = @cache.find_key_by_step_run_id(step_run_id) return unless key rec = @cache.get(key) return if rec && rec.invocation_count != invocation_count @logger&.info( "DurableEvictionManager: server-initiated eviction for " \ "step_run_id=#{step_run_id} invocation_count=#{invocation_count}", ) evict_run(key) end |
#mark_active(key) ⇒ Object
Mark the run as active (decrements the wait counter).
113 114 115 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 113 def mark_active(key) @cache.mark_active(key, now: now) end |
#mark_waiting(key, wait_kind:, resource_id:) ⇒ Object
Mark the run as waiting (increments the wait counter).
108 109 110 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 108 def mark_waiting(key, wait_kind:, resource_id:) @cache.mark_waiting(key, now: now, wait_kind: wait_kind, resource_id: resource_id) end |
#register_run(key, step_run_id:, invocation_count:, eviction_policy:) ⇒ Object
Register a new durable run invocation. Takes the current time from the system clock.
92 93 94 95 96 97 98 99 100 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 92 def register_run(key, step_run_id:, invocation_count:, eviction_policy:) @cache.register_run( key, step_run_id: step_run_id, invocation_count: invocation_count, now: now, eviction_policy: eviction_policy, ) end |
#start ⇒ Object
Start the background eviction ticker. Idempotent.
70 71 72 73 74 75 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 70 def start return if @thread&.alive? @stopped = false @thread = Thread.new { run_loop } end |
#stop ⇒ Object
Signal the background thread to stop. Does not join.
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 78 def stop @stopped = true thread = @thread return unless thread&.alive? begin thread.wakeup rescue ThreadError nil end end |
#unregister_run(key) ⇒ Object
Unregister a durable run invocation.
103 104 105 |
# File 'lib/hatchet/worker/durable_eviction/manager.rb', line 103 def unregister_run(key) @cache.unregister_run(key) end |