Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::PostgresStore
- Inherits:
-
Object
- Object
- Legion::Extensions::Agentic::Memory::Trace::Helpers::PostgresStore
- Defined in:
- lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb
Overview
Write-through durable store backed by Legion::Data (PostgreSQL or MySQL). All writes go directly to the database — no in-memory dirty tracking, no flush. Scoped by tenant_id and agent_id so multiple agents can share the same DB tables safely.
Constant Summary collapse
- TRACES_TABLE =
:memory_traces- ASSOCIATIONS_TABLE =
:memory_associations
Instance Method Summary collapse
-
#all_traces(min_strength: 0.0) ⇒ Object
Return all traces for this tenant.
-
#associations_for(trace_id) ⇒ Object
Return the set of trace IDs associated with a given trace (bidirectional).
- #count ⇒ Object
-
#db_ready? ⇒ Boolean
Returns true when both required tables exist in the connected DB.
-
#delete(trace_id) ⇒ Object
Delete a trace and its association rows.
-
#delete_least_recently_used(trace_type:, count:) ⇒ Object
Delete the N least-recently-used traces for a given type (quota enforcement).
-
#delete_lowest_confidence(trace_type:, count:) ⇒ Object
Delete the N traces with the lowest confidence for a given type (quota enforcement).
-
#firmware_traces ⇒ Object
Convenience: retrieve firmware-type traces.
-
#flush ⇒ Object
No-op — this store is write-through; nothing to flush.
-
#initialize(tenant_id: nil, agent_id: nil) ⇒ PostgresStore
constructor
A new instance of PostgresStore.
-
#record_coactivation(id_a, id_b) ⇒ Object
Create or increment a coactivation association between two traces.
-
#retrieve(trace_id) ⇒ Object
(also: #get)
Retrieve a single trace by trace_id (tenant/agent scoped).
-
#retrieve_by_domain(tag, min_strength: 0.0, limit: 50) ⇒ Object
Retrieve traces whose domain_tags column contains the given tag string.
-
#retrieve_by_type(type, limit: 100, min_strength: 0.0) ⇒ Object
Retrieve traces by type, ordered by strength descending.
-
#store(trace) ⇒ Object
Store (upsert) a trace by trace_id.
-
#update(trace_id, **fields) ⇒ Object
Partial update of a trace by trace_id.
-
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
BFS traversal starting from start_id.
Constructor Details
#initialize(tenant_id: nil, agent_id: nil) ⇒ PostgresStore
Returns a new instance of PostgresStore.
18 19 20 21 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 18 def initialize(tenant_id: nil, agent_id: nil) @tenant_id = tenant_id @agent_id = agent_id || resolve_agent_id end |
Instance Method Details
#all_traces(min_strength: 0.0) ⇒ Object
Return all traces for this tenant.
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 97 def all_traces(min_strength: 0.0) return [] unless db_ready? ds = traces_ds ds = ds.where { strength >= min_strength } if min_strength > 0.0 ds.all.map { |r| deserialize_trace(r) } rescue StandardError => e log_warn("all_traces failed: #{e.}") [] end |
#associations_for(trace_id) ⇒ Object
Return the set of trace IDs associated with a given trace (bidirectional).
164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 164 def associations_for(trace_id) return [] unless db_ready? a_side = db[ASSOCIATIONS_TABLE] .where(trace_id_a: trace_id) .select_map(:trace_id_b) b_side = db[ASSOCIATIONS_TABLE] .where(trace_id_b: trace_id) .select_map(:trace_id_a) (a_side + b_side).uniq rescue StandardError => e log_warn("associations_for failed: #{e.}") [] end |
#count ⇒ Object
252 253 254 255 256 257 258 259 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 252 def count return 0 unless db_ready? traces_ds.count rescue StandardError => e log_warn("count failed: #{e.}") 0 end |
#db_ready? ⇒ Boolean
Returns true when both required tables exist in the connected DB.
265 266 267 268 269 270 271 272 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 265 def db_ready? defined?(Legion::Data) && Legion::Data.respond_to?(:connection) && Legion::Data.connection&.table_exists?(TRACES_TABLE) && Legion::Data.connection.table_exists?(ASSOCIATIONS_TABLE) rescue StandardError => _e false end |
#delete(trace_id) ⇒ Object
Delete a trace and its association rows.
109 110 111 112 113 114 115 116 117 118 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 109 def delete(trace_id) HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? return unless db_ready? db[ASSOCIATIONS_TABLE].where(trace_id_a: trace_id).delete db[ASSOCIATIONS_TABLE].where(trace_id_b: trace_id).delete db[TRACES_TABLE].where(trace_id: trace_id).delete rescue StandardError => e log_warn("delete failed: #{e.}") end |
#delete_least_recently_used(trace_type:, count:) ⇒ Object
Delete the N least-recently-used traces for a given type (quota enforcement).
233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 233 def delete_least_recently_used(trace_type:, count:) return unless db_ready? ids = traces_ds .where(trace_type: trace_type.to_s) .order(:last_reinforced) .limit(count) .select_map(:trace_id) ids.each { |tid| delete(tid) } rescue StandardError => e log_warn("delete_least_recently_used failed: #{e.}") end |
#delete_lowest_confidence(trace_type:, count:) ⇒ Object
Delete the N traces with the lowest confidence for a given type (quota enforcement).
218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 218 def delete_lowest_confidence(trace_type:, count:) return unless db_ready? ids = traces_ds .where(trace_type: trace_type.to_s) .order(:confidence) .limit(count) .select_map(:trace_id) ids.each { |tid| delete(tid) } rescue StandardError => e log_warn("delete_lowest_confidence failed: #{e.}") end |
#firmware_traces ⇒ Object
Convenience: retrieve firmware-type traces.
248 249 250 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 248 def firmware_traces retrieve_by_type(:firmware) end |
#flush ⇒ Object
No-op — this store is write-through; nothing to flush.
262 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 262 def flush; end |
#record_coactivation(id_a, id_b) ⇒ Object
Create or increment a coactivation association between two traces.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 132 def record_coactivation(id_a, id_b) return unless db_ready? return if id_a == id_b now = Time.now.utc existing = db[ASSOCIATIONS_TABLE] .where(trace_id_a: id_a, trace_id_b: id_b) .first if existing db[ASSOCIATIONS_TABLE] .where(id: existing[:id]) .update( coactivation_count: existing[:coactivation_count] + 1, updated_at: now ) else db[ASSOCIATIONS_TABLE].insert( trace_id_a: id_a, trace_id_b: id_b, coactivation_count: 1, linked: false, tenant_id: @tenant_id, created_at: now, updated_at: now ) end rescue StandardError => e log_warn("record_coactivation failed: #{e.}") end |
#retrieve(trace_id) ⇒ Object Also known as: get
Retrieve a single trace by trace_id (tenant/agent scoped). Checks the Redis hot tier first; falls through to DB on a miss and caches the result. Returns a trace hash or nil.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 45 def retrieve(trace_id) if HotTier.available? cached = HotTier.fetch_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) return cached if cached end return nil unless db_ready? row = traces_ds.where(trace_id: trace_id).first trace = row ? deserialize_trace(row) : nil HotTier.cache_trace(trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? && trace trace rescue StandardError => e log_warn("retrieve failed: #{e.}") nil end |
#retrieve_by_domain(tag, min_strength: 0.0, limit: 50) ⇒ Object
Retrieve traces whose domain_tags column contains the given tag string.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 81 def retrieve_by_domain(tag, min_strength: 0.0, limit: 50) return [] unless db_ready? rows = traces_ds .where(Sequel.like(:domain_tags, "%#{tag}%")) .where { strength >= min_strength } .order(Sequel.desc(:strength)) .limit(limit) .all rows.map { |r| deserialize_trace(r) } rescue StandardError => e log_warn("retrieve_by_domain failed: #{e.}") [] end |
#retrieve_by_type(type, limit: 100, min_strength: 0.0) ⇒ Object
Retrieve traces by type, ordered by strength descending.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 65 def retrieve_by_type(type, limit: 100, min_strength: 0.0) return [] unless db_ready? rows = traces_ds .where(trace_type: type.to_s) .where { strength >= min_strength } .order(Sequel.desc(:strength)) .limit(limit) .all rows.map { |r| deserialize_trace(r) } rescue StandardError => e log_warn("retrieve_by_type failed: #{e.}") [] end |
#store(trace) ⇒ Object
Store (upsert) a trace by trace_id. Returns the trace_id on success, nil if the DB is not ready.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 25 def store(trace) return nil unless db_ready? row = serialize_trace(trace) ds = db[TRACES_TABLE] if db.adapter_scheme == :mysql2 ds.insert_conflict(update: row.except(:trace_id)).insert(row) else ds.insert_conflict(target: :trace_id, update: row.except(:trace_id)).insert(row) end HotTier.cache_trace(trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? trace[:trace_id] rescue StandardError => e log_warn("store failed: #{e.}") nil end |
#update(trace_id, **fields) ⇒ Object
Partial update of a trace by trace_id. Evicts the hot-tier entry so a stale cached version cannot be served.
122 123 124 125 126 127 128 129 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 122 def update(trace_id, **fields) return unless db_ready? db[TRACES_TABLE].where(trace_id: trace_id).update(map_update_fields(fields)) HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? rescue StandardError => e log_warn("update failed: #{e.}") end |
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
BFS traversal starting from start_id. Returns an array of { trace_id:, depth:, path: } hashes.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 181 def walk_associations(start_id:, max_hops: 12, min_strength: 0.1) return [] unless db_ready? start_row = traces_ds.where(trace_id: start_id).first return [] unless start_row results = [] visited = Set.new([start_id]) queue = [[start_id, 0, [start_id]]] until queue.empty? current_id, depth, path = queue.shift neighbor_ids = associations_for(current_id) neighbor_ids.each do |nid| next if visited.include?(nid) neighbor_row = traces_ds .where(trace_id: nid) .where { strength >= min_strength } .first next unless neighbor_row visited << nid neighbor_path = path + [nid] results << { trace_id: nid, depth: depth + 1, path: neighbor_path } queue << [nid, depth + 1, neighbor_path] if depth + 1 < max_hops end end results rescue StandardError => e log_warn("walk_associations failed: #{e.}") [] end |