Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::PostgresStore
- Inherits:
-
Object
- Object
- Legion::Extensions::Agentic::Memory::Trace::Helpers::PostgresStore
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb
Overview
Write-through durable store backed by Legion::Data (PostgreSQL or MySQL). All writes go directly to the database — no in-memory dirty tracking, no flush. Scoped by tenant_id and agent_id so multiple agents can share the same DB tables safely.
Constant Summary collapse
- TRACES_TABLE =
:memory_traces- ASSOCIATIONS_TABLE =
:memory_associations
Instance Method Summary collapse
-
#all_traces(min_strength: 0.0) ⇒ Object
Return all traces for this tenant.
-
#associations_for(trace_id) ⇒ Object
Return the set of trace IDs associated with a given trace (bidirectional).
- #count ⇒ Object
-
#db_ready? ⇒ Boolean
Returns true when both required tables exist and the user has INSERT privilege on both.
-
#delete(trace_id) ⇒ Object
Delete a trace and its association rows.
-
#delete_least_recently_used(trace_type:, count:) ⇒ Object
Delete the N least-recently-used traces for a given type (quota enforcement).
-
#delete_lowest_confidence(trace_type:, count:) ⇒ Object
Delete the N traces with the lowest confidence for a given type (quota enforcement).
-
#firmware_traces ⇒ Object
Convenience: retrieve firmware-type traces.
-
#flush ⇒ Object
No-op — this store is write-through; nothing to flush.
-
#initialize(tenant_id: nil, agent_id: nil) ⇒ PostgresStore
constructor
A new instance of PostgresStore.
-
#record_coactivation(id_a, id_b) ⇒ Object
Create or increment a coactivation association between two traces.
-
#retrieve(trace_id) ⇒ Object
(also: #get)
Retrieve a single trace by trace_id (tenant/agent scoped).
-
#retrieve_by_domain(tag, min_strength: 0.0, limit: 50) ⇒ Object
Retrieve traces whose domain_tags JSON array contains the given tag.
-
#retrieve_by_type(type, limit: 100, min_strength: 0.0) ⇒ Object
Retrieve traces by type, ordered by strength descending.
-
#store(trace) ⇒ Object
Store (upsert) a trace by trace_id.
-
#update(trace_id, **fields) ⇒ Object
Partial update of a trace by trace_id.
-
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
BFS traversal starting from start_id.
Constructor Details
#initialize(tenant_id: nil, agent_id: nil) ⇒ PostgresStore
Returns a new instance of PostgresStore.
20 21 22 23 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 20 def initialize(tenant_id: nil, agent_id: nil) @tenant_id = tenant_id @agent_id = agent_id || resolve_agent_id end |
Instance Method Details
#all_traces(min_strength: 0.0) ⇒ Object
Return all traces for this tenant.
110 111 112 113 114 115 116 117 118 119 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 110 def all_traces(min_strength: 0.0) return [] unless db_ready? ds = traces_ds ds = ds.where { strength >= min_strength } if min_strength > 0.0 ds.all.map { |r| deserialize_trace(r) } rescue StandardError => e log_warn("all_traces failed: #{e.}") [] end |
#associations_for(trace_id) ⇒ Object
Return the set of trace IDs associated with a given trace (bidirectional).
177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 177 def associations_for(trace_id) return [] unless db_ready? a_side = db[ASSOCIATIONS_TABLE] .where(trace_id_a: trace_id) .select_map(:trace_id_b) b_side = db[ASSOCIATIONS_TABLE] .where(trace_id_b: trace_id) .select_map(:trace_id_a) (a_side + b_side).uniq rescue StandardError => e log_warn("associations_for failed: #{e.}") [] end |
#count ⇒ Object
265 266 267 268 269 270 271 272 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 265 def count return 0 unless db_ready? traces_ds.count rescue StandardError => e log_warn("count failed: #{e.}") 0 end |
#db_ready? ⇒ Boolean
Returns true when both required tables exist and the user has INSERT privilege on both.
278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 278 def db_ready? defined?(Legion::Data) && Legion::Data.respond_to?(:connection) && Legion::Data.connection&.table_exists?(TRACES_TABLE) && Legion::Data.connection.table_exists?(ASSOCIATIONS_TABLE) && Legion::Data.can_write?(TRACES_TABLE) && Legion::Data.can_write?(ASSOCIATIONS_TABLE) rescue StandardError => e log.error "[trace_persistence] db_ready?: #{e.}" false end |
#delete(trace_id) ⇒ Object
Delete a trace and its association rows.
122 123 124 125 126 127 128 129 130 131 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 122 def delete(trace_id) HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? return unless db_ready? db[ASSOCIATIONS_TABLE].where(trace_id_a: trace_id).delete db[ASSOCIATIONS_TABLE].where(trace_id_b: trace_id).delete db[TRACES_TABLE].where(trace_id: trace_id).delete rescue StandardError => e log_warn("delete failed: #{e.}") end |
#delete_least_recently_used(trace_type:, count:) ⇒ Object
Delete the N least-recently-used traces for a given type (quota enforcement).
246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 246 def delete_least_recently_used(trace_type:, count:) return unless db_ready? ids = traces_ds .where(trace_type: trace_type.to_s) .order(:last_reinforced) .limit(count) .select_map(:trace_id) ids.each { |tid| delete(tid) } rescue StandardError => e log_warn("delete_least_recently_used failed: #{e.}") end |
#delete_lowest_confidence(trace_type:, count:) ⇒ Object
Delete the N traces with the lowest confidence for a given type (quota enforcement).
231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 231 def delete_lowest_confidence(trace_type:, count:) return unless db_ready? ids = traces_ds .where(trace_type: trace_type.to_s) .order(:confidence) .limit(count) .select_map(:trace_id) ids.each { |tid| delete(tid) } rescue StandardError => e log_warn("delete_lowest_confidence failed: #{e.}") end |
#firmware_traces ⇒ Object
Convenience: retrieve firmware-type traces.
261 262 263 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 261 def firmware_traces retrieve_by_type(:firmware) end |
#flush ⇒ Object
No-op — this store is write-through; nothing to flush.
275 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 275 def flush; end |
#record_coactivation(id_a, id_b) ⇒ Object
Create or increment a coactivation association between two traces.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 145 def record_coactivation(id_a, id_b) return unless db_ready? return if id_a == id_b now = Time.now.utc existing = db[ASSOCIATIONS_TABLE] .where(trace_id_a: id_a, trace_id_b: id_b) .first if existing db[ASSOCIATIONS_TABLE] .where(id: existing[:id]) .update( coactivation_count: existing[:coactivation_count] + 1, updated_at: now ) else db[ASSOCIATIONS_TABLE].insert( trace_id_a: id_a, trace_id_b: id_b, coactivation_count: 1, linked: false, tenant_id: @tenant_id, created_at: now, updated_at: now ) end rescue StandardError => e log_warn("record_coactivation failed: #{e.}") end |
#retrieve(trace_id) ⇒ Object Also known as: get
Retrieve a single trace by trace_id (tenant/agent scoped). Checks the Redis hot tier first; falls through to DB on a miss and caches the result. Returns a trace hash or nil.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 48 def retrieve(trace_id) if HotTier.available? cached = HotTier.fetch_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) return cached if cached end return nil unless db_ready? row = traces_ds.where(trace_id: trace_id).first trace = row ? deserialize_trace(row) : nil HotTier.cache_trace(trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? && trace trace rescue StandardError => e log_warn("retrieve failed: #{e.}") nil end |
#retrieve_by_domain(tag, min_strength: 0.0, limit: 50) ⇒ Object
Retrieve traces whose domain_tags JSON array contains the given tag. Uses PostgreSQL JSON containment operator (@>) when available for exact array-member matching; falls back to LIKE for other adapters.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 86 def retrieve_by_domain(tag, min_strength: 0.0, limit: 50) return [] unless db_ready? ds = traces_ds if db.adapter_scheme == :postgres # JSONB @> operator matches exact array elements, not substrings json_array = ::JSON.dump([tag]) ds = ds.where(Sequel.lit("domain_tags::jsonb @> '#{json_array}'::jsonb")) else # Fallback: substring match (imprecise but broadly compatible) ds = ds.where(Sequel.like(:domain_tags, "%#{tag}%")) end rows = ds .where { strength >= min_strength } .order(Sequel.desc(:strength)) .limit(limit) .all rows.map { |r| deserialize_trace(r) } rescue StandardError => e log_warn("retrieve_by_domain failed: #{e.}") [] end |
#retrieve_by_type(type, limit: 100, min_strength: 0.0) ⇒ Object
Retrieve traces by type, ordered by strength descending.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 68 def retrieve_by_type(type, limit: 100, min_strength: 0.0) return [] unless db_ready? rows = traces_ds .where(trace_type: type.to_s) .where { strength >= min_strength } .order(Sequel.desc(:strength)) .limit(limit) .all rows.map { |r| deserialize_trace(r) } rescue StandardError => e log_warn("retrieve_by_type failed: #{e.}") [] end |
#store(trace) ⇒ Object
Store (upsert) a trace by trace_id. Returns the trace_id on success, nil if the DB is not ready.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 27 def store(trace) return nil unless db_ready? normalized_trace = Helpers::Trace.normalize_trace_affect(trace) row = serialize_trace(normalized_trace) ds = db[TRACES_TABLE] if db.adapter_scheme == :mysql2 ds.insert_conflict(update: row.except(:trace_id)).insert(row) else ds.insert_conflict(target: :trace_id, update: row.except(:trace_id)).insert(row) end HotTier.cache_trace(normalized_trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? normalized_trace[:trace_id] rescue StandardError => e log_warn("store failed: #{e.}") nil end |
#update(trace_id, **fields) ⇒ Object
Partial update of a trace by trace_id. Evicts the hot-tier entry so a stale cached version cannot be served.
135 136 137 138 139 140 141 142 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 135 def update(trace_id, **fields) return unless db_ready? db[TRACES_TABLE].where(trace_id: trace_id).update(map_update_fields(fields)) HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? rescue StandardError => e log_warn("update failed: #{e.}") end |
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
BFS traversal starting from start_id. Returns an array of { trace_id:, depth:, path: } hashes.
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 194 def walk_associations(start_id:, max_hops: 12, min_strength: 0.1) return [] unless db_ready? start_row = traces_ds.where(trace_id: start_id).first return [] unless start_row results = [] visited = Set.new([start_id]) queue = [[start_id, 0, [start_id]]] until queue.empty? current_id, depth, path = queue.shift neighbor_ids = associations_for(current_id) neighbor_ids.each do |nid| next if visited.include?(nid) neighbor_row = traces_ds .where(trace_id: nid) .where { strength >= min_strength } .first next unless neighbor_row visited << nid neighbor_path = path + [nid] results << { trace_id: nid, depth: depth + 1, path: neighbor_path } queue << [nid, depth + 1, neighbor_path] if depth + 1 < max_hops end end results rescue StandardError => e log_warn("walk_associations failed: #{e.}") [] end |