Class: Hatchet::WorkerRuntime::DurableEviction::DurableEvictionCache

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/worker/durable_eviction/cache.rb

Overview

Thread-safe in-memory cache of waiting durable task invocations.

Mirrors :class:‘hatchet_sdk.worker.durable_eviction.cache.DurableEvictionCache` from the Python SDK. All public methods lock an internal monitor.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeDurableEvictionCache

Returns a new instance of DurableEvictionCache.



52
53
54
55
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 52

def initialize
  @runs = {}
  @monitor = Monitor.new
end

Class Method Details

.build_eviction_reason(cause, rec, ttl: nil) ⇒ Object

Build a human-readable eviction reason string.



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 175

def self.build_eviction_reason(cause, rec, ttl: nil)
  wait_desc = rec.wait_kind || "unknown"
  wait_desc = "#{wait_desc}(#{rec.wait_resource_id})" if rec.wait_resource_id

  case cause
  when EvictionCause::TTL_EXCEEDED
    ttl_str = ttl ? " (#{ttl}s)" : ""
    "Wait TTL#{ttl_str} exceeded while waiting on #{wait_desc}"
  when EvictionCause::CAPACITY_PRESSURE
    "Worker at capacity while waiting on #{wait_desc}"
  when EvictionCause::WORKER_SHUTDOWN
    "Worker shutdown while waiting on #{wait_desc}"
  else
    raise ArgumentError, "Unknown eviction cause: #{cause}"
  end
end

Instance Method Details

#all_waitingArray<DurableRunRecord>

Returns:



82
83
84
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 82

def all_waiting
  @monitor.synchronize { @runs.values.select(&:waiting?) }
end

#find_key_by_step_run_id(step_run_id) ⇒ String?

Returns the action key for the matching record.

Parameters:

  • step_run_id (String)

Returns:

  • (String, nil)

    the action key for the matching record



88
89
90
91
92
93
94
95
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 88

def find_key_by_step_run_id(step_run_id)
  @monitor.synchronize do
    @runs.each do |k, rec|
      return k if rec.step_run_id == step_run_id
    end
    nil
  end
end

#get(key) ⇒ DurableRunRecord?

Fetch the record for a given key.

Returns:



77
78
79
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 77

def get(key)
  @monitor.synchronize { @runs[key] }
end

#mark_active(key, now:) ⇒ Object

Mark the run as active (decrement the wait counter). Floors at zero so unmatched mark_active calls never underflow.



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 113

def mark_active(key, now:)
  @monitor.synchronize do
    rec = @runs[key]
    return unless rec

    rec.wait_count = [rec.wait_count - 1, 0].max
    if rec.wait_count.zero?
      rec.waiting_since = nil
      rec.wait_kind = nil
      rec.wait_resource_id = nil
    end
  end
end

#mark_waiting(key, now:, wait_kind:, resource_id:) ⇒ Object

Mark the run as waiting (ref-counted). Increments the wait counter and stores the wait metadata on the record.



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 99

def mark_waiting(key, now:, wait_kind:, resource_id:)
  @monitor.synchronize do
    rec = @runs[key]
    return unless rec

    rec.wait_count += 1
    rec.waiting_since = now if rec.wait_count == 1
    rec.wait_kind = wait_kind
    rec.wait_resource_id = resource_id
  end
end

#register_run(key, step_run_id:, invocation_count:, now:, eviction_policy:) ⇒ Object

Register a new durable run invocation.



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 58

def register_run(key, step_run_id:, invocation_count:, now:, eviction_policy:)
  @monitor.synchronize do
    @runs[key] = DurableRunRecord.new(
      key: key,
      step_run_id: step_run_id,
      invocation_count: invocation_count,
      eviction_policy: eviction_policy,
      registered_at: now,
    )
  end
end

#select_eviction_candidate(now:, durable_slots:, reserve_slots:, min_wait_for_capacity_eviction:) ⇒ String?

Select an eviction candidate, preferring TTL-eligible candidates first, then capacity-pressure candidates (only when above the waiting capacity threshold).

Returns:

  • (String, nil)

    The action key of the chosen candidate, or nil



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 132

def select_eviction_candidate(now:, durable_slots:, reserve_slots:, min_wait_for_capacity_eviction:)
  @monitor.synchronize do
    waiting = @runs.values.select do |r|
      r.waiting? && !r.eviction_policy.nil?
    end
    return nil if waiting.empty?

    ttl_eligible = waiting.select do |r|
      policy = r.eviction_policy
      policy&.ttl && r.waiting_since && (now - r.waiting_since) >= policy.ttl
    end

    unless ttl_eligible.empty?
      chosen = ttl_eligible.min_by do |r|
        [r.eviction_policy ? r.eviction_policy.priority : 0, r.waiting_since || now]
      end
      ttl = chosen.eviction_policy&.ttl
      chosen.eviction_reason = DurableEvictionCache.build_eviction_reason(
        EvictionCause::TTL_EXCEEDED, chosen, ttl: ttl,
      )
      return chosen.key
    end

    return nil unless capacity_pressure?(durable_slots, reserve_slots, waiting.length)

    capacity_candidates = waiting.select do |r|
      r.eviction_policy&.allow_capacity_eviction &&
        r.waiting_since &&
        (now - r.waiting_since) >= min_wait_for_capacity_eviction
    end
    return nil if capacity_candidates.empty?

    chosen = capacity_candidates.min_by do |r|
      [r.eviction_policy ? r.eviction_policy.priority : 0, r.waiting_since || now]
    end
    chosen.eviction_reason = DurableEvictionCache.build_eviction_reason(
      EvictionCause::CAPACITY_PRESSURE, chosen,
    )
    chosen.key
  end
end

#unregister_run(key) ⇒ Object

Unregister a durable run invocation.



71
72
73
# File 'lib/hatchet/worker/durable_eviction/cache.rb', line 71

def unregister_run(key)
  @monitor.synchronize { @runs.delete(key) }
end