Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Inherits:
-
Object
- Object
- Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Defined in:
- lib/legion/extensions/agentic/memory/trace/helpers/store.rb
Overview
In-memory store for development and testing. Production deployments should use a PostgreSQL + Redis backed store.
Instance Attribute Summary collapse
-
#associations ⇒ Object
readonly
Returns the value of attribute associations.
-
#traces ⇒ Object
readonly
Returns the value of attribute traces.
Instance Method Summary collapse
- #all_traces(min_strength: 0.0) ⇒ Object
- #clear_dirty_flags(trace_rows_snapshot) ⇒ Object
- #count ⇒ Object
- #delete(trace_id) ⇒ Object
- #firmware_traces ⇒ Object
- #flush ⇒ Object
- #get(trace_id) ⇒ Object
-
#initialize(partition_id: nil) ⇒ Store
constructor
A new instance of Store.
- #load_from_local ⇒ Object
- #persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
- #persist_dirty_traces(db, trace_rows_snapshot, scoped_trace_ids, memory_trace_ids, traces_dirty) ⇒ Object
- #record_coactivation(trace_id_a, trace_id_b) ⇒ Object
- #restore_traces(traces) ⇒ Object
- #retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
- #retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
- #retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
- #save_to_local ⇒ Object
- #snapshot_dirty_state ⇒ Object
- #store(trace) ⇒ Object
- #synchronize ⇒ Object
- #walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
Constructor Details
#initialize(partition_id: nil) ⇒ Store
Returns a new instance of Store.
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 16 def initialize(partition_id: nil) @mutex = Mutex.new @traces = {} @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @partition_id = partition_id || resolve_partition_id @traces_dirty = false @associations_dirty = false @persisted_trace_rows = {} load_from_local end |
Instance Attribute Details
#associations ⇒ Object (readonly)
Returns the value of attribute associations.
14 15 16 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 14 def associations @associations end |
#traces ⇒ Object (readonly)
Returns the value of attribute traces.
14 15 16 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 14 def traces @traces end |
Instance Method Details
#all_traces(min_strength: 0.0) ⇒ Object
93 94 95 96 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 93 def all_traces(min_strength: 0.0) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:strength] >= min_strength } end |
#clear_dirty_flags(trace_rows_snapshot) ⇒ Object
214 215 216 217 218 219 220 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 214 def clear_dirty_flags(trace_rows_snapshot) @mutex.synchronize do @traces_dirty = false @associations_dirty = false @persisted_trace_rows = trace_rows_snapshot end end |
#count ⇒ Object
98 99 100 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 98 def count @mutex.synchronize { @traces.size } end |
#delete(trace_id) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 41 def delete(trace_id) @mutex.synchronize do removed_trace = @traces.delete(trace_id) @traces.each_value { |trace| trace[:associated_traces]&.delete(trace_id) } removed_links = @associations.delete(trace_id) @associations.each_value { |links| links.delete(trace_id) } @traces_dirty = true if removed_trace @associations_dirty = true if removed_trace || removed_links end end |
#firmware_traces ⇒ Object
104 105 106 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 104 def firmware_traces retrieve_by_type(:firmware) end |
#flush ⇒ Object
108 109 110 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 108 def flush save_to_local end |
#get(trace_id) ⇒ Object
37 38 39 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 37 def get(trace_id) @mutex.synchronize { @traces[trace_id] } end |
#load_from_local ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 222 def load_from_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection db[:memory_traces].where(partition_id: @partition_id).each do |row| @traces[row[:trace_id]] = deserialize_trace_from_db(row) end trace_ids = @traces.keys unless trace_ids.empty? db[:memory_associations].where(trace_id_a: trace_ids).each do |row| @associations[row[:trace_id_a]] ||= {} @associations[row[:trace_id_a]][row[:trace_id_b]] = row[:coactivation_count] end end @persisted_trace_rows = @traces.transform_values { |trace| serialize_trace_for_db(trace) } @traces_dirty = false @associations_dirty = false end |
#persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 201 def persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) assoc_scope_ids = (scoped_trace_ids + memory_trace_ids).uniq return unless (dirty || !stale_ids.empty?) && !assoc_scope_ids.empty? db[:memory_associations].where(trace_id_a: assoc_scope_ids).delete db[:memory_associations].where(trace_id_b: assoc_scope_ids).delete associations_snapshot.each do |id_a, targets| targets.each do |id_b, count| db[:memory_associations].insert(trace_id_a: id_a, trace_id_b: id_b, coactivation_count: count) end end end |
#persist_dirty_traces(db, trace_rows_snapshot, scoped_trace_ids, memory_trace_ids, traces_dirty) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 189 def persist_dirty_traces(db, trace_rows_snapshot, scoped_trace_ids, memory_trace_ids, traces_dirty) return [] unless traces_dirty ds = db[:memory_traces] trace_rows_snapshot.each_value do |row| ds.insert_conflict(:replace).insert(row) end stale_ids = scoped_trace_ids - memory_trace_ids db[:memory_traces].where(trace_id: stale_ids).delete unless stale_ids.empty? stale_ids end |
#record_coactivation(trace_id_a, trace_id_b) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 79 def record_coactivation(trace_id_a, trace_id_b) return if trace_id_a == trace_id_b @mutex.synchronize do @associations[trace_id_a][trace_id_b] += 1 @associations[trace_id_b][trace_id_a] += 1 @associations_dirty = true threshold = Helpers::Trace::COACTIVATION_THRESHOLD @traces_dirty = true if @associations[trace_id_a][trace_id_b] >= threshold && link_traces(trace_id_a, trace_id_b) end end |
#restore_traces(traces) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 112 def restore_traces(traces) snapshot = Array(traces).each_with_object({}) do |trace, memo| next unless trace.is_a?(Hash) && trace[:trace_id] restored = trace.dup restored[:partition_id] ||= @partition_id memo[restored[:trace_id]] = restored end @mutex.synchronize do @traces = snapshot @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @traces_dirty = true @associations_dirty = true end flush end |
#retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 68 def retrieve_associated(trace_id, min_strength: 0.0, limit: 20) trace = @traces[trace_id] return [] unless trace trace[:associated_traces] .filter_map { |id| @traces[id] } .select { |t| t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
61 62 63 64 65 66 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 61 def retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:domain_tags].include?(domain_tag) && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
54 55 56 57 58 59 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 54 def retrieve_by_type(type, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:trace_type] == type && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#save_to_local ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 159 def save_to_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection snapshots = snapshot_dirty_state return unless snapshots traces_snapshot, associations_snapshot, trace_rows_snapshot, traces_dirty, associations_dirty = snapshots db.transaction do scoped_trace_ids = db[:memory_traces].where(partition_id: @partition_id).select_map(:trace_id) memory_trace_ids = traces_snapshot.keys stale_ids = persist_dirty_traces(db, trace_rows_snapshot, scoped_trace_ids, memory_trace_ids, traces_dirty) persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, associations_dirty) end clear_dirty_flags(trace_rows_snapshot) end |
#snapshot_dirty_state ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 177 def snapshot_dirty_state traces_snapshot, associations_snapshot, trace_rows_snapshot, traces_dirty, associations_dirty = @mutex.synchronize do ts = @traces.transform_values(&:dup) as = @associations.each_with_object({}) { |(tid, targets), memo| memo[tid] = targets.dup } trs = ts.transform_values { |trace| serialize_trace_for_db(trace) } [ts, as, trs, @traces_dirty || trs != @persisted_trace_rows, @associations_dirty] end return nil unless traces_dirty || associations_dirty [traces_snapshot, associations_snapshot, trace_rows_snapshot, traces_dirty, associations_dirty] end |
#store(trace) ⇒ Object
27 28 29 30 31 32 33 34 35 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 27 def store(trace) persisted_trace = trace.dup persisted_trace[:partition_id] ||= @partition_id @mutex.synchronize do @traces_dirty = true if @traces[persisted_trace[:trace_id]] != persisted_trace @traces[persisted_trace[:trace_id]] = persisted_trace end persisted_trace[:trace_id] end |
#synchronize ⇒ Object
102 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 102 def synchronize(&) = @mutex.synchronize(&) |
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 130 def walk_associations(start_id:, max_hops: 12, min_strength: 0.1) snapshot = @mutex.synchronize { @traces.dup } return [] unless snapshot.key?(start_id) results = [] visited = Set.new([start_id]) queue = [[start_id, 0, [start_id]]] until queue.empty? current_id, depth, path = queue.shift next unless (current = snapshot[current_id]) current[:associated_traces].each do |neighbor_id| next if visited.include?(neighbor_id) neighbor = snapshot[neighbor_id] next unless neighbor next unless neighbor[:strength] >= min_strength visited << neighbor_id neighbor_path = path + [neighbor_id] results << { trace_id: neighbor_id, depth: depth + 1, path: neighbor_path } queue << [neighbor_id, depth + 1, neighbor_path] if depth + 1 < max_hops end end results end |