Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::PostgresStore

Inherits:
Object
  • Object
show all
Defined in:
lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb

Overview

Write-through durable store backed by Legion::Data (PostgreSQL or MySQL). All writes go directly to the database — no in-memory dirty tracking, no flush. Scoped by tenant_id and agent_id so multiple agents can share the same DB tables safely.

Constant Summary collapse

TRACES_TABLE =
:memory_traces
ASSOCIATIONS_TABLE =
:memory_associations

Instance Method Summary collapse

Constructor Details

#initialize(tenant_id: nil, agent_id: nil) ⇒ PostgresStore

Returns a new instance of PostgresStore.



18
19
20
21
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 18

def initialize(tenant_id: nil, agent_id: nil)
  @tenant_id = tenant_id
  @agent_id  = agent_id || resolve_agent_id
end

Instance Method Details

#all_traces(min_strength: 0.0) ⇒ Object

Return all traces for this tenant.



97
98
99
100
101
102
103
104
105
106
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 97

def all_traces(min_strength: 0.0)
  return [] unless db_ready?

  ds = traces_ds
  ds = ds.where { strength >= min_strength } if min_strength > 0.0
  ds.all.map { |r| deserialize_trace(r) }
rescue StandardError => e
  log_warn("all_traces failed: #{e.message}")
  []
end

#associations_for(trace_id) ⇒ Object

Return the set of trace IDs associated with a given trace (bidirectional).



164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 164

def associations_for(trace_id)
  return [] unless db_ready?

  a_side = db[ASSOCIATIONS_TABLE]
           .where(trace_id_a: trace_id)
           .select_map(:trace_id_b)
  b_side = db[ASSOCIATIONS_TABLE]
           .where(trace_id_b: trace_id)
           .select_map(:trace_id_a)
  (a_side + b_side).uniq
rescue StandardError => e
  log_warn("associations_for failed: #{e.message}")
  []
end

#countObject



252
253
254
255
256
257
258
259
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 252

def count
  return 0 unless db_ready?

  traces_ds.count
rescue StandardError => e
  log_warn("count failed: #{e.message}")
  0
end

#db_ready?Boolean

Returns true when both required tables exist in the connected DB.

Returns:

  • (Boolean)


265
266
267
268
269
270
271
272
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 265

def db_ready?
  defined?(Legion::Data) &&
    Legion::Data.respond_to?(:connection) &&
    Legion::Data.connection&.table_exists?(TRACES_TABLE) &&
    Legion::Data.connection.table_exists?(ASSOCIATIONS_TABLE)
rescue StandardError => _e
  false
end

#delete(trace_id) ⇒ Object

Delete a trace and its association rows.



109
110
111
112
113
114
115
116
117
118
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 109

def delete(trace_id)
  HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available?
  return unless db_ready?

  db[ASSOCIATIONS_TABLE].where(trace_id_a: trace_id).delete
  db[ASSOCIATIONS_TABLE].where(trace_id_b: trace_id).delete
  db[TRACES_TABLE].where(trace_id: trace_id).delete
rescue StandardError => e
  log_warn("delete failed: #{e.message}")
end

#delete_least_recently_used(trace_type:, count:) ⇒ Object

Delete the N least-recently-used traces for a given type (quota enforcement).



233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 233

def delete_least_recently_used(trace_type:, count:)
  return unless db_ready?

  ids = traces_ds
        .where(trace_type: trace_type.to_s)
        .order(:last_reinforced)
        .limit(count)
        .select_map(:trace_id)

  ids.each { |tid| delete(tid) }
rescue StandardError => e
  log_warn("delete_least_recently_used failed: #{e.message}")
end

#delete_lowest_confidence(trace_type:, count:) ⇒ Object

Delete the N traces with the lowest confidence for a given type (quota enforcement).



218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 218

def delete_lowest_confidence(trace_type:, count:)
  return unless db_ready?

  ids = traces_ds
        .where(trace_type: trace_type.to_s)
        .order(:confidence)
        .limit(count)
        .select_map(:trace_id)

  ids.each { |tid| delete(tid) }
rescue StandardError => e
  log_warn("delete_lowest_confidence failed: #{e.message}")
end

#firmware_tracesObject

Convenience: retrieve firmware-type traces.



248
249
250
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 248

def firmware_traces
  retrieve_by_type(:firmware)
end

#flushObject

No-op — this store is write-through; nothing to flush.



262
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 262

def flush; end

#record_coactivation(id_a, id_b) ⇒ Object

Create or increment a coactivation association between two traces.



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 132

def record_coactivation(id_a, id_b)
  return unless db_ready?
  return if id_a == id_b

  now = Time.now.utc
  existing = db[ASSOCIATIONS_TABLE]
             .where(trace_id_a: id_a, trace_id_b: id_b)
             .first

  if existing
    db[ASSOCIATIONS_TABLE]
      .where(id: existing[:id])
      .update(
        coactivation_count: existing[:coactivation_count] + 1,
        updated_at:         now
      )
  else
    db[ASSOCIATIONS_TABLE].insert(
      trace_id_a:         id_a,
      trace_id_b:         id_b,
      coactivation_count: 1,
      linked:             false,
      tenant_id:          @tenant_id,
      created_at:         now,
      updated_at:         now
    )
  end
rescue StandardError => e
  log_warn("record_coactivation failed: #{e.message}")
end

#retrieve(trace_id) ⇒ Object Also known as: get

Retrieve a single trace by trace_id (tenant/agent scoped). Checks the Redis hot tier first; falls through to DB on a miss and caches the result. Returns a trace hash or nil.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 45

def retrieve(trace_id)
  if HotTier.available?
    cached = HotTier.fetch_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id)
    return cached if cached
  end

  return nil unless db_ready?

  row = traces_ds.where(trace_id: trace_id).first
  trace = row ? deserialize_trace(row) : nil
  HotTier.cache_trace(trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available? && trace
  trace
rescue StandardError => e
  log_warn("retrieve failed: #{e.message}")
  nil
end

#retrieve_by_domain(tag, min_strength: 0.0, limit: 50) ⇒ Object

Retrieve traces whose domain_tags column contains the given tag string.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 81

def retrieve_by_domain(tag, min_strength: 0.0, limit: 50)
  return [] unless db_ready?

  rows = traces_ds
         .where(Sequel.like(:domain_tags, "%#{tag}%"))
         .where { strength >= min_strength }
         .order(Sequel.desc(:strength))
         .limit(limit)
         .all
  rows.map { |r| deserialize_trace(r) }
rescue StandardError => e
  log_warn("retrieve_by_domain failed: #{e.message}")
  []
end

#retrieve_by_type(type, limit: 100, min_strength: 0.0) ⇒ Object

Retrieve traces by type, ordered by strength descending.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 65

def retrieve_by_type(type, limit: 100, min_strength: 0.0)
  return [] unless db_ready?

  rows = traces_ds
         .where(trace_type: type.to_s)
         .where { strength >= min_strength }
         .order(Sequel.desc(:strength))
         .limit(limit)
         .all
  rows.map { |r| deserialize_trace(r) }
rescue StandardError => e
  log_warn("retrieve_by_type failed: #{e.message}")
  []
end

#store(trace) ⇒ Object

Store (upsert) a trace by trace_id. Returns the trace_id on success, nil if the DB is not ready.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 25

def store(trace)
  return nil unless db_ready?

  row = serialize_trace(trace)
  ds  = db[TRACES_TABLE]
  if db.adapter_scheme == :mysql2
    ds.insert_conflict(update: row.except(:trace_id)).insert(row)
  else
    ds.insert_conflict(target: :trace_id, update: row.except(:trace_id)).insert(row)
  end
  HotTier.cache_trace(trace, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available?
  trace[:trace_id]
rescue StandardError => e
  log_warn("store failed: #{e.message}")
  nil
end

#update(trace_id, **fields) ⇒ Object

Partial update of a trace by trace_id. Evicts the hot-tier entry so a stale cached version cannot be served.



122
123
124
125
126
127
128
129
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 122

def update(trace_id, **fields)
  return unless db_ready?

  db[TRACES_TABLE].where(trace_id: trace_id).update(map_update_fields(fields))
  HotTier.evict_trace(trace_id, tenant_id: @tenant_id, agent_id: @agent_id) if HotTier.available?
rescue StandardError => e
  log_warn("update failed: #{e.message}")
end

#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object

BFS traversal starting from start_id. Returns an array of { trace_id:, depth:, path: } hashes.



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/legion/extensions/agentic/memory/trace/helpers/postgres_store.rb', line 181

def walk_associations(start_id:, max_hops: 12, min_strength: 0.1)
  return [] unless db_ready?

  start_row = traces_ds.where(trace_id: start_id).first
  return [] unless start_row

  results = []
  visited = Set.new([start_id])
  queue   = [[start_id, 0, [start_id]]]

  until queue.empty?
    current_id, depth, path = queue.shift
    neighbor_ids = associations_for(current_id)

    neighbor_ids.each do |nid|
      next if visited.include?(nid)

      neighbor_row = traces_ds
                     .where(trace_id: nid)
                     .where { strength >= min_strength }
                     .first
      next unless neighbor_row

      visited << nid
      neighbor_path = path + [nid]
      results << { trace_id: nid, depth: depth + 1, path: neighbor_path }
      queue << [nid, depth + 1, neighbor_path] if depth + 1 < max_hops
    end
  end

  results
rescue StandardError => e
  log_warn("walk_associations failed: #{e.message}")
  []
end