Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::PostgresStore

Inherits:
Object
  • Object
show all
Includes:
Logging::Helper
Defined in:
lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb

Overview

Write-through durable store backed by Legion::Data (PostgreSQL or MySQL). All writes go directly to the database — no in-memory dirty tracking, no flush. Scoped by tenant_id and agent_id so multiple agents can share the same DB tables safely.

Constant Summary collapse

TRACES_TABLE =
:memory_traces
ASSOCIATIONS_TABLE =
:memory_associations

Instance Method Summary collapse

Constructor Details

#initialize(tenant_id: nil, agent_id: nil) ⇒ PostgresStore

Returns a new instance of PostgresStore.



20
21
22
23
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 20

def initialize(tenant_id: nil, agent_id: nil)
  @tenant_id = tenant_id
  @agent_id  = agent_id || resolve_agent_id
end

Instance Method Details

#all_traces(min_strength: 0.0) ⇒ Object

Return all traces for this tenant.



99
100
101
102
103
104
105
106
107
108
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 99

def all_traces(min_strength: 0.0)
  return [] unless db_ready?

  ds = traces_ds
  ds = ds.where { strength >= min_strength } if min_strength > 0.0
  ds.all.map { |r| deserialize_trace(r) }
rescue StandardError => e
  log_warn("all_traces failed: #{e.message}")
  []
end

#associations_for(trace_id) ⇒ Object

Return the set of trace IDs associated with a given trace (bidirectional).



166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 166

def associations_for(trace_id)
  return [] unless db_ready?

  a_side = db[ASSOCIATIONS_TABLE]
           .where(trace_id_a: trace_id)
           .select_map(:trace_id_b)
  b_side = db[ASSOCIATIONS_TABLE]
           .where(trace_id_b: trace_id)
           .select_map(:trace_id_a)
  (a_side + b_side).uniq
rescue StandardError => e
  log_warn("associations_for failed: #{e.message}")
  []
end

#countObject



254
255
256
257
258
259
260
261
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 254

def count
  return 0 unless db_ready?

  traces_ds.count
rescue StandardError => e
  log_warn("count failed: #{e.message}")
  0
end

#db_ready?Boolean

Returns true when both required tables exist in the connected DB.

Returns:

  • (Boolean)


267
268
269
270
271
272
273
274
275
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 267

def db_ready?
  defined?(Legion::Data) &&
    Legion::Data.respond_to?(:connection) &&
    Legion::Data.connection&.table_exists?(TRACES_TABLE) &&
    Legion::Data.connection.table_exists?(ASSOCIATIONS_TABLE)
rescue StandardError => e
  log.error "[trace_persistence] db_ready?: #{e.message}"
  false
end

#delete(trace_id) ⇒ Object

Delete a trace and its association rows.



111
112
113
114
115
116
117
118
119
120
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 111

def delete(trace_id)
  HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available?
  return unless db_ready?

  db[ASSOCIATIONS_TABLE].where(trace_id_a: trace_id).delete
  db[ASSOCIATIONS_TABLE].where(trace_id_b: trace_id).delete
  db[TRACES_TABLE].where(trace_id: trace_id).delete
rescue StandardError => e
  log_warn("delete failed: #{e.message}")
end

#delete_least_recently_used(trace_type:, count:) ⇒ Object

Delete the N least-recently-used traces for a given type (quota enforcement).



235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 235

def delete_least_recently_used(trace_type:, count:)
  return unless db_ready?

  ids = traces_ds
        .where(trace_type: trace_type.to_s)
        .order(:last_reinforced)
        .limit(count)
        .select_map(:trace_id)

  ids.each { |tid| delete(tid) }
rescue StandardError => e
  log_warn("delete_least_recently_used failed: #{e.message}")
end

#delete_lowest_confidence(trace_type:, count:) ⇒ Object

Delete the N traces with the lowest confidence for a given type (quota enforcement).



220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 220

def delete_lowest_confidence(trace_type:, count:)
  return unless db_ready?

  ids = traces_ds
        .where(trace_type: trace_type.to_s)
        .order(:confidence)
        .limit(count)
        .select_map(:trace_id)

  ids.each { |tid| delete(tid) }
rescue StandardError => e
  log_warn("delete_lowest_confidence failed: #{e.message}")
end

#firmware_tracesObject

Convenience: retrieve firmware-type traces.



250
251
252
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 250

def firmware_traces
  retrieve_by_type(:firmware)
end

#flushObject

No-op — this store is write-through; nothing to flush.



264
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 264

def flush; end

#record_coactivation(id_a, id_b) ⇒ Object

Create or increment a coactivation association between two traces.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 134

def record_coactivation(id_a, id_b)
  return unless db_ready?
  return if id_a == id_b

  now = Time.now.utc
  existing = db[ASSOCIATIONS_TABLE]
             .where(trace_id_a: id_a, trace_id_b: id_b)
             .first

  if existing
    db[ASSOCIATIONS_TABLE]
      .where(id: existing[:id])
      .update(
        coactivation_count: existing[:coactivation_count] + 1,
        updated_at:         now
      )
  else
    db[ASSOCIATIONS_TABLE].insert(
      trace_id_a:         id_a,
      trace_id_b:         id_b,
      coactivation_count: 1,
      linked:             false,
      tenant_id:          @tenant_id,
      created_at:         now,
      updated_at:         now
    )
  end
rescue StandardError => e
  log_warn("record_coactivation failed: #{e.message}")
end

#retrieve(trace_id) ⇒ Object Also known as: get

Retrieve a single trace by trace_id (tenant/agent scoped). Checks the Redis hot tier first; falls through to DB on a miss and caches the result. Returns a trace hash or nil.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 47

def retrieve(trace_id)
  if HotTier.available?
    cached = HotTier.fetch_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id)
    return cached if cached
  end

  return nil unless db_ready?

  row = traces_ds.where(trace_id: trace_id).first
  trace = row ? deserialize_trace(row) : nil
  HotTier.cache_trace(trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? && trace
  trace
rescue StandardError => e
  log_warn("retrieve failed: #{e.message}")
  nil
end

#retrieve_by_domain(tag, min_strength: 0.0, limit: 50) ⇒ Object

Retrieve traces whose domain_tags column contains the given tag string.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 83

def retrieve_by_domain(tag, min_strength: 0.0, limit: 50)
  return [] unless db_ready?

  rows = traces_ds
         .where(Sequel.like(:domain_tags, "%#{tag}%"))
         .where { strength >= min_strength }
         .order(Sequel.desc(:strength))
         .limit(limit)
         .all
  rows.map { |r| deserialize_trace(r) }
rescue StandardError => e
  log_warn("retrieve_by_domain failed: #{e.message}")
  []
end

#retrieve_by_type(type, limit: 100, min_strength: 0.0) ⇒ Object

Retrieve traces by type, ordered by strength descending.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 67

def retrieve_by_type(type, limit: 100, min_strength: 0.0)
  return [] unless db_ready?

  rows = traces_ds
         .where(trace_type: type.to_s)
         .where { strength >= min_strength }
         .order(Sequel.desc(:strength))
         .limit(limit)
         .all
  rows.map { |r| deserialize_trace(r) }
rescue StandardError => e
  log_warn("retrieve_by_type failed: #{e.message}")
  []
end

#store(trace) ⇒ Object

Store (upsert) a trace by trace_id. Returns the trace_id on success, nil if the DB is not ready.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 27

def store(trace)
  return nil unless db_ready?

  row = serialize_trace(trace)
  ds  = db[TRACES_TABLE]
  if db.adapter_scheme == :mysql2
    ds.insert_conflict(update: row.except(:trace_id)).insert(row)
  else
    ds.insert_conflict(target: :trace_id, update: row.except(:trace_id)).insert(row)
  end
  HotTier.cache_trace(trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available?
  trace[:trace_id]
rescue StandardError => e
  log_warn("store failed: #{e.message}")
  nil
end

#update(trace_id, **fields) ⇒ Object

Partial update of a trace by trace_id. Evicts the hot-tier entry so a stale cached version cannot be served.



124
125
126
127
128
129
130
131
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 124

def update(trace_id, **fields)
  return unless db_ready?

  db[TRACES_TABLE].where(trace_id: trace_id).update(map_update_fields(fields))
  HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available?
rescue StandardError => e
  log_warn("update failed: #{e.message}")
end

#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object

BFS traversal starting from start_id. Returns an array of { trace_id:, depth:, path: } hashes.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 183

def walk_associations(start_id:, max_hops: 12, min_strength: 0.1)
  return [] unless db_ready?

  start_row = traces_ds.where(trace_id: start_id).first
  return [] unless start_row

  results = []
  visited = Set.new([start_id])
  queue   = [[start_id, 0, [start_id]]]

  until queue.empty?
    current_id, depth, path = queue.shift
    neighbor_ids = associations_for(current_id)

    neighbor_ids.each do |nid|
      next if visited.include?(nid)

      neighbor_row = traces_ds
                     .where(trace_id: nid)
                     .where { strength >= min_strength }
                     .first
      next unless neighbor_row

      visited << nid
      neighbor_path = path + [nid]
      results << { trace_id: nid, depth: depth + 1, path: neighbor_path }
      queue << [nid, depth + 1, neighbor_path] if depth + 1 < max_hops
    end
  end

  results
rescue StandardError => e
  log_warn("walk_associations failed: #{e.message}")
  []
end