Class: Hatchet::WorkerRuntime::DurableEviction::DurableEvictionCache
- Inherits:
-
Object
- Object
- Hatchet::WorkerRuntime::DurableEviction::DurableEvictionCache
- Defined in:
- lib/hatchet/worker/durable_eviction/cache.rb
Overview
Thread-safe in-memory cache of waiting durable task invocations.
Mirrors :class:‘hatchet_sdk.worker.durable_eviction.cache.DurableEvictionCache` from the Python SDK. All public methods lock an internal monitor.
Class Method Summary collapse
-
.build_eviction_reason(cause, rec, ttl: nil) ⇒ Object
Build a human-readable eviction reason string.
Instance Method Summary collapse
- #all_waiting ⇒ Array<DurableRunRecord>
-
#find_key_by_step_run_id(step_run_id) ⇒ String?
The action key for the matching record.
-
#get(key) ⇒ DurableRunRecord?
Fetch the record for a given key.
-
#initialize ⇒ DurableEvictionCache
constructor
A new instance of DurableEvictionCache.
-
#mark_active(key, now:) ⇒ Object
Mark the run as active (decrement the wait counter).
-
#mark_waiting(key, now:, wait_kind:, resource_id:) ⇒ Object
Mark the run as waiting (ref-counted).
-
#register_run(key, step_run_id:, invocation_count:, now:, eviction_policy:) ⇒ Object
Register a new durable run invocation.
-
#select_eviction_candidate(now:, durable_slots:, reserve_slots:, min_wait_for_capacity_eviction:) ⇒ String?
Select an eviction candidate, preferring TTL-eligible candidates first, then capacity-pressure candidates (only when above the waiting capacity threshold).
-
#unregister_run(key) ⇒ Object
Unregister a durable run invocation.
Constructor Details
#initialize ⇒ DurableEvictionCache
Returns a new instance of DurableEvictionCache.
52 53 54 55 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 52 def initialize @runs = {} @monitor = Monitor.new end |
Class Method Details
.build_eviction_reason(cause, rec, ttl: nil) ⇒ Object
Build a human-readable eviction reason string.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 175 def self.build_eviction_reason(cause, rec, ttl: nil) wait_desc = rec.wait_kind || "unknown" wait_desc = "#{wait_desc}(#{rec.wait_resource_id})" if rec.wait_resource_id case cause when EvictionCause::TTL_EXCEEDED ttl_str = ttl ? " (#{ttl}s)" : "" "Wait TTL#{ttl_str} exceeded while waiting on #{wait_desc}" when EvictionCause::CAPACITY_PRESSURE "Worker at capacity while waiting on #{wait_desc}" when EvictionCause::WORKER_SHUTDOWN "Worker shutdown while waiting on #{wait_desc}" else raise ArgumentError, "Unknown eviction cause: #{cause}" end end |
Instance Method Details
#all_waiting ⇒ Array<DurableRunRecord>
82 83 84 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 82 def all_waiting @monitor.synchronize { @runs.values.select(&:waiting?) } end |
#find_key_by_step_run_id(step_run_id) ⇒ String?
Returns the action key for the matching record.
88 89 90 91 92 93 94 95 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 88 def find_key_by_step_run_id(step_run_id) @monitor.synchronize do @runs.each do |k, rec| return k if rec.step_run_id == step_run_id end nil end end |
#get(key) ⇒ DurableRunRecord?
Fetch the record for a given key.
77 78 79 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 77 def get(key) @monitor.synchronize { @runs[key] } end |
#mark_active(key, now:) ⇒ Object
Mark the run as active (decrement the wait counter). Floors at zero so unmatched mark_active calls never underflow.
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 113 def mark_active(key, now:) @monitor.synchronize do rec = @runs[key] return unless rec rec.wait_count = [rec.wait_count - 1, 0].max if rec.wait_count.zero? rec.waiting_since = nil rec.wait_kind = nil rec.wait_resource_id = nil end end end |
#mark_waiting(key, now:, wait_kind:, resource_id:) ⇒ Object
Mark the run as waiting (ref-counted). Increments the wait counter and stores the wait metadata on the record.
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 99 def mark_waiting(key, now:, wait_kind:, resource_id:) @monitor.synchronize do rec = @runs[key] return unless rec rec.wait_count += 1 rec.waiting_since = now if rec.wait_count == 1 rec.wait_kind = wait_kind rec.wait_resource_id = resource_id end end |
#register_run(key, step_run_id:, invocation_count:, now:, eviction_policy:) ⇒ Object
Register a new durable run invocation.
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 58 def register_run(key, step_run_id:, invocation_count:, now:, eviction_policy:) @monitor.synchronize do @runs[key] = DurableRunRecord.new( key: key, step_run_id: step_run_id, invocation_count: invocation_count, eviction_policy: eviction_policy, registered_at: now, ) end end |
#select_eviction_candidate(now:, durable_slots:, reserve_slots:, min_wait_for_capacity_eviction:) ⇒ String?
Select an eviction candidate, preferring TTL-eligible candidates first, then capacity-pressure candidates (only when above the waiting capacity threshold).
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 132 def select_eviction_candidate(now:, durable_slots:, reserve_slots:, min_wait_for_capacity_eviction:) @monitor.synchronize do waiting = @runs.values.select do |r| r.waiting? && !r.eviction_policy.nil? end return nil if waiting.empty? ttl_eligible = waiting.select do |r| policy = r.eviction_policy policy&.ttl && r.waiting_since && (now - r.waiting_since) >= policy.ttl end unless ttl_eligible.empty? chosen = ttl_eligible.min_by do |r| [r.eviction_policy ? r.eviction_policy.priority : 0, r.waiting_since || now] end ttl = chosen.eviction_policy&.ttl chosen.eviction_reason = DurableEvictionCache.build_eviction_reason( EvictionCause::TTL_EXCEEDED, chosen, ttl: ttl, ) return chosen.key end return nil unless capacity_pressure?(durable_slots, reserve_slots, waiting.length) capacity_candidates = waiting.select do |r| r.eviction_policy&.allow_capacity_eviction && r.waiting_since && (now - r.waiting_since) >= min_wait_for_capacity_eviction end return nil if capacity_candidates.empty? chosen = capacity_candidates.min_by do |r| [r.eviction_policy ? r.eviction_policy.priority : 0, r.waiting_since || now] end chosen.eviction_reason = DurableEvictionCache.build_eviction_reason( EvictionCause::CAPACITY_PRESSURE, chosen, ) chosen.key end end |
#unregister_run(key) ⇒ Object
Unregister a durable run invocation.
71 72 73 |
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 71 def unregister_run(key) @monitor.synchronize { @runs.delete(key) } end |