Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/legion/extensions/agentic/memory/trace/helpers/store.rb

Overview

In-memory store for development and testing. Production deployments should use a PostgreSQL + Redis backed store.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(partition_id: nil) ⇒ Store

Returns a new instance of Store.



16
17
18
19
20
21
22
23
24
25
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 16

def initialize(partition_id: nil)
  @mutex = Mutex.new
  @traces = {}
  @associations = Hash.new { |h, k| h[k] = Hash.new(0) }
  @partition_id = partition_id || resolve_partition_id
  @traces_dirty = false
  @associations_dirty = false
  @persisted_trace_rows = {}
  load_from_local
end

Instance Attribute Details

#associationsObject (readonly)

Returns the value of attribute associations.



14
15
16
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 14

def associations
  @associations
end

#tracesObject (readonly)

Returns the value of attribute traces.



14
15
16
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 14

def traces
  @traces
end

Instance Method Details

#all_traces(min_strength: 0.0) ⇒ Object



93
94
95
96
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 93

def all_traces(min_strength: 0.0)
  snapshot = @mutex.synchronize { @traces.values }
  snapshot.select { |t| t[:strength] >= min_strength }
end

#clear_dirty_flags(trace_rows_snapshot) ⇒ Object



214
215
216
217
218
219
220
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 214

def clear_dirty_flags(trace_rows_snapshot)
  @mutex.synchronize do
    @traces_dirty = false
    @associations_dirty = false
    @persisted_trace_rows = trace_rows_snapshot
  end
end

#countObject



98
99
100
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 98

def count
  @mutex.synchronize { @traces.size }
end

#delete(trace_id) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 41

def delete(trace_id)
  @mutex.synchronize do
    removed_trace = @traces.delete(trace_id)
    @traces.each_value { |trace| trace[:associated_traces]&.delete(trace_id) }

    removed_links = @associations.delete(trace_id)
    @associations.each_value { |links| links.delete(trace_id) }

    @traces_dirty = true if removed_trace
    @associations_dirty = true if removed_trace || removed_links
  end
end

#firmware_tracesObject



104
105
106
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 104

def firmware_traces
  retrieve_by_type(:firmware)
end

#flushObject



108
109
110
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 108

def flush
  save_to_local
end

#get(trace_id) ⇒ Object



37
38
39
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 37

def get(trace_id)
  @mutex.synchronize { @traces[trace_id] }
end

#load_from_localObject



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 222

def load_from_local
  return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected?
  return unless Legion::Data::Local.connection.table_exists?(:memory_traces)

  db = Legion::Data::Local.connection

  db[:memory_traces].where(partition_id: @partition_id).each do |row|
    @traces[row[:trace_id]] = deserialize_trace_from_db(row)
  end

  trace_ids = @traces.keys
  unless trace_ids.empty?
    db[:memory_associations].where(trace_id_a: trace_ids).each do |row|
      @associations[row[:trace_id_a]] ||= {}
      @associations[row[:trace_id_a]][row[:trace_id_b]] = row[:coactivation_count]
    end
  end

  @persisted_trace_rows = @traces.transform_values { |trace| serialize_trace_for_db(trace) }
  @traces_dirty = false
  @associations_dirty = false
end

#persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 201

def persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty)
  assoc_scope_ids = (scoped_trace_ids + memory_trace_ids).uniq
  return unless (dirty || !stale_ids.empty?) && !assoc_scope_ids.empty?

  db[:memory_associations].where(trace_id_a: assoc_scope_ids).delete
  db[:memory_associations].where(trace_id_b: assoc_scope_ids).delete
  associations_snapshot.each do |id_a, targets|
    targets.each do |id_b, count|
      db[:memory_associations].insert(trace_id_a: id_a, trace_id_b: id_b, coactivation_count: count)
    end
  end
end

#persist_dirty_traces(db, trace_rows_snapshot, scoped_trace_ids, memory_trace_ids, traces_dirty) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 189

def persist_dirty_traces(db, trace_rows_snapshot, scoped_trace_ids, memory_trace_ids, traces_dirty)
  return [] unless traces_dirty

  ds = db[:memory_traces]
  trace_rows_snapshot.each_value do |row|
    ds.insert_conflict(:replace).insert(row)
  end
  stale_ids = scoped_trace_ids - memory_trace_ids
  db[:memory_traces].where(trace_id: stale_ids).delete unless stale_ids.empty?
  stale_ids
end

#record_coactivation(trace_id_a, trace_id_b) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 79

def record_coactivation(trace_id_a, trace_id_b)
  return if trace_id_a == trace_id_b

  @mutex.synchronize do
    @associations[trace_id_a][trace_id_b] += 1
    @associations[trace_id_b][trace_id_a] += 1
    @associations_dirty = true

    threshold = Helpers::Trace::COACTIVATION_THRESHOLD
    @traces_dirty = true if @associations[trace_id_a][trace_id_b] >= threshold &&
                            link_traces(trace_id_a, trace_id_b)
  end
end

#restore_traces(traces) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 112

def restore_traces(traces)
  snapshot = Array(traces).each_with_object({}) do |trace, memo|
    next unless trace.is_a?(Hash) && trace[:trace_id]

    restored = trace.dup
    restored[:partition_id] ||= @partition_id
    memo[restored[:trace_id]] = restored
  end

  @mutex.synchronize do
    @traces = snapshot
    @associations = Hash.new { |h, k| h[k] = Hash.new(0) }
    @traces_dirty = true
    @associations_dirty = true
  end
  flush
end

#retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 68

def retrieve_associated(trace_id, min_strength: 0.0, limit: 20)
  trace = @traces[trace_id]
  return [] unless trace

  trace[:associated_traces]
    .filter_map { |id| @traces[id] }
    .select { |t| t[:strength] >= min_strength }
    .sort_by { |t| -t[:strength] }
    .first(limit)
end

#retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object



61
62
63
64
65
66
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 61

def retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100)
  snapshot = @mutex.synchronize { @traces.values }
  snapshot.select { |t| t[:domain_tags].include?(domain_tag) && t[:strength] >= min_strength }
          .sort_by { |t| -t[:strength] }
          .first(limit)
end

#retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object



54
55
56
57
58
59
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 54

def retrieve_by_type(type, min_strength: 0.0, limit: 100)
  snapshot = @mutex.synchronize { @traces.values }
  snapshot.select { |t| t[:trace_type] == type && t[:strength] >= min_strength }
          .sort_by { |t| -t[:strength] }
          .first(limit)
end

#save_to_localObject



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 159

def save_to_local
  return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected?
  return unless Legion::Data::Local.connection.table_exists?(:memory_traces)

  db = Legion::Data::Local.connection
  snapshots = snapshot_dirty_state
  return unless snapshots

  traces_snapshot, associations_snapshot, trace_rows_snapshot, traces_dirty, associations_dirty = snapshots
  db.transaction do
    scoped_trace_ids = db[:memory_traces].where(partition_id: @partition_id).select_map(:trace_id)
    memory_trace_ids = traces_snapshot.keys
    stale_ids = persist_dirty_traces(db, trace_rows_snapshot, scoped_trace_ids, memory_trace_ids, traces_dirty)
    persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, associations_dirty)
  end
  clear_dirty_flags(trace_rows_snapshot)
end

#snapshot_dirty_stateObject



177
178
179
180
181
182
183
184
185
186
187
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 177

def snapshot_dirty_state
  traces_snapshot, associations_snapshot, trace_rows_snapshot, traces_dirty, associations_dirty = @mutex.synchronize do
    ts = @traces.transform_values(&:dup)
    as = @associations.each_with_object({}) { |(tid, targets), memo| memo[tid] = targets.dup }
    trs = ts.transform_values { |trace| serialize_trace_for_db(trace) }
    [ts, as, trs, @traces_dirty || trs != @persisted_trace_rows, @associations_dirty]
  end
  return nil unless traces_dirty || associations_dirty

  [traces_snapshot, associations_snapshot, trace_rows_snapshot, traces_dirty, associations_dirty]
end

#store(trace) ⇒ Object



27
28
29
30
31
32
33
34
35
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 27

def store(trace)
  persisted_trace = trace.dup
  persisted_trace[:partition_id] ||= @partition_id
  @mutex.synchronize do
    @traces_dirty = true if @traces[persisted_trace[:trace_id]] != persisted_trace
    @traces[persisted_trace[:trace_id]] = persisted_trace
  end
  persisted_trace[:trace_id]
end

#synchronizeObject



102
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 102

def synchronize(&) = @mutex.synchronize(&)

#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 130

def walk_associations(start_id:, max_hops: 12, min_strength: 0.1)
  snapshot = @mutex.synchronize { @traces.dup }
  return [] unless snapshot.key?(start_id)

  results  = []
  visited  = Set.new([start_id])
  queue    = [[start_id, 0, [start_id]]]

  until queue.empty?
    current_id, depth, path = queue.shift
    next unless (current = snapshot[current_id])

    current[:associated_traces].each do |neighbor_id|
      next if visited.include?(neighbor_id)

      neighbor = snapshot[neighbor_id]
      next unless neighbor
      next unless neighbor[:strength] >= min_strength

      visited << neighbor_id
      neighbor_path = path + [neighbor_id]
      results << { trace_id: neighbor_id, depth: depth + 1, path: neighbor_path }
      queue << [neighbor_id, depth + 1, neighbor_path] if depth + 1 < max_hops
    end
  end

  results
end