Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Inherits:
-
Object
- Object
- Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/extensions/agentic/memory/trace/helpers/store.rb
Overview
In-memory store for development and testing. Production deployments should use a PostgreSQL + Redis backed store.
Constant Summary collapse
- ASSOCIATION_LOAD_BATCH_SIZE =
500
Instance Attribute Summary collapse
-
#associations ⇒ Object
readonly
Returns the value of attribute associations.
-
#traces ⇒ Object
readonly
Returns the value of attribute traces.
Instance Method Summary collapse
- #all_traces(min_strength: 0.0) ⇒ Object
- #clear_dirty_flags(trace_rows_snapshot) ⇒ Object
- #count ⇒ Object
- #delete(trace_id) ⇒ Object
- #firmware_traces ⇒ Object
- #flush ⇒ Object
- #get(trace_id) ⇒ Object
-
#initialize(partition_id: nil) ⇒ Store
constructor
A new instance of Store.
- #load_from_local ⇒ Object
- #persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
- #persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) ⇒ Object
- #record_coactivation(trace_id_a, trace_id_b) ⇒ Object
- #restore_traces(traces) ⇒ Object
- #retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
- #retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
- #retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
- #save_to_local ⇒ Object
- #snapshot_dirty_state ⇒ Object
- #store(trace) ⇒ Object
- #synchronize ⇒ Object
- #walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
Constructor Details
#initialize(partition_id: nil) ⇒ Store
Returns a new instance of Store.
20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 20 def initialize(partition_id: nil) @mutex = Mutex.new @traces = {} @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @partition_id = partition_id || resolve_partition_id @dirty_trace_ids = Set.new @deleted_trace_ids = Set.new @associations_dirty = false @persisted_trace_rows = {} load_from_local end |
Instance Attribute Details
#associations ⇒ Object (readonly)
Returns the value of attribute associations.
18 19 20 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 18 def associations @associations end |
#traces ⇒ Object (readonly)
Returns the value of attribute traces.
18 19 20 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 18 def traces @traces end |
Instance Method Details
#all_traces(min_strength: 0.0) ⇒ Object
106 107 108 109 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 106 def all_traces(min_strength: 0.0) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:strength] >= min_strength } end |
#clear_dirty_flags(trace_rows_snapshot) ⇒ Object
254 255 256 257 258 259 260 261 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 254 def clear_dirty_flags(trace_rows_snapshot) @mutex.synchronize do @dirty_trace_ids.clear @deleted_trace_ids.clear @associations_dirty = false @persisted_trace_rows = trace_rows_snapshot end end |
#count ⇒ Object
111 112 113 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 111 def count @mutex.synchronize { @traces.size } end |
#delete(trace_id) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 46 def delete(trace_id) @mutex.synchronize do removed_trace = @traces.delete(trace_id) @traces.each_value { |trace| trace[:associated_traces]&.delete(trace_id) } removed_links = @associations.delete(trace_id) @associations.each_value { |links| links.delete(trace_id) } if removed_trace @dirty_trace_ids.delete(trace_id) @deleted_trace_ids << trace_id end @associations_dirty = true if removed_trace || removed_links end end |
#firmware_traces ⇒ Object
117 118 119 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 117 def firmware_traces retrieve_by_type(:firmware) end |
#flush ⇒ Object
121 122 123 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 121 def flush save_to_local end |
#get(trace_id) ⇒ Object
42 43 44 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 42 def get(trace_id) @mutex.synchronize { @traces[trace_id] } end |
#load_from_local ⇒ Object
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 263 def load_from_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection db[:memory_traces].where(partition_id: @partition_id).each do |row| @traces[row[:trace_id]] = deserialize_trace_from_db(row) @persisted_trace_rows[row[:trace_id]] = row.dup end load_local_associations(db) @dirty_trace_ids = Set.new @deleted_trace_ids = Set.new @associations_dirty = false end |
#persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 230 def persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) assoc_scope_ids = (scoped_trace_ids + memory_trace_ids).uniq return unless (dirty || !stale_ids.empty?) && !assoc_scope_ids.empty? association_columns = db[:memory_associations].columns partitioned_associations = association_columns.include?(:partition_id) if partitioned_associations db[:memory_associations].where(partition_id: @partition_id).delete else assoc_scope_ids.each_slice(ASSOCIATION_LOAD_BATCH_SIZE) do |ids| db[:memory_associations].where(trace_id_a: ids).delete db[:memory_associations].where(trace_id_b: ids).delete end end associations_snapshot.each do |id_a, targets| targets.each do |id_b, count| row = { trace_id_a: id_a, trace_id_b: id_b, coactivation_count: count } row[:partition_id] = @partition_id if partitioned_associations db[:memory_associations].insert(row) end end end |
#persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) ⇒ Object
216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 216 def persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) changed_ids = trace_changes[:changed_ids] || [] deleted_ids = trace_changes[:deleted_ids] || [] all_removals = (stale_ids + deleted_ids).uniq return if changed_ids.empty? && all_removals.empty? ds = db[:memory_traces] changed_ids.each do |trace_id| row = trace_rows_snapshot[trace_id] ds.insert_conflict(:replace).insert(row) if row end db[:memory_traces].where(trace_id: all_removals).delete unless all_removals.empty? end |
#record_coactivation(trace_id_a, trace_id_b) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 90 def record_coactivation(trace_id_a, trace_id_b) return if trace_id_a == trace_id_b @mutex.synchronize do @associations[trace_id_a][trace_id_b] += 1 @associations[trace_id_b][trace_id_a] += 1 @associations_dirty = true threshold = Helpers::Trace::COACTIVATION_THRESHOLD if @associations[trace_id_a][trace_id_b] >= threshold && link_traces(trace_id_a, trace_id_b) @dirty_trace_ids << trace_id_a @dirty_trace_ids << trace_id_b end end end |
#restore_traces(traces) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 125 def restore_traces(traces) snapshot = Array(traces).each_with_object({}) do |trace, memo| next unless trace.is_a?(Hash) && trace[:trace_id] restored = Helpers::Trace.normalize_trace_affect(trace) restored[:partition_id] ||= @partition_id memo[restored[:trace_id]] = restored end @mutex.synchronize do @traces = snapshot @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @dirty_trace_ids = Set.new(@traces.keys) @deleted_trace_ids = Set.new @associations_dirty = true end flush end |
#retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 76 def retrieve_associated(trace_id, min_strength: 0.0, limit: 20) associated = @mutex.synchronize do trace = @traces[trace_id] next [] unless trace trace[:associated_traces].filter_map { |id| @traces[id]&.dup } end associated .select { |t| t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
69 70 71 72 73 74 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 69 def retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:domain_tags].include?(domain_tag) && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
62 63 64 65 66 67 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 62 def retrieve_by_type(type, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:trace_type] == type && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#save_to_local ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 173 def save_to_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection snapshots = snapshot_dirty_state return unless snapshots traces_snapshot, associations_snapshot, trace_rows_snapshot, trace_changes, associations_dirty = snapshots db.transaction do scoped_trace_ids = db[:memory_traces].where(partition_id: @partition_id).select_map(:trace_id) memory_trace_ids = traces_snapshot.keys stale_ids = scoped_trace_ids - memory_trace_ids persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, associations_dirty) end clear_dirty_flags(trace_rows_snapshot) end |
#snapshot_dirty_state ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 192 def snapshot_dirty_state @mutex.synchronize do dirty_ids = @dirty_trace_ids.to_a deleted_ids = @deleted_trace_ids.to_a associations_dirty = @associations_dirty return nil if dirty_ids.empty? && deleted_ids.empty? && !associations_dirty dirty_rows = dirty_ids.each_with_object({}) do |trace_id, h| trace = @traces[trace_id] h[trace_id] = serialize_trace_for_db(trace) if trace end trace_rows_snapshot = @persisted_trace_rows.merge(dirty_rows) deleted_ids.each { |id| trace_rows_snapshot.delete(id) } traces_snapshot = @traces.transform_values(&:dup) as = @associations.each_with_object({}) { |(tid, targets), memo| memo[tid] = targets.dup } trace_changes = { dirty: true, changed_ids: dirty_ids, deleted_ids: deleted_ids } [traces_snapshot, as, trace_rows_snapshot, trace_changes, associations_dirty] end end |
#store(trace) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 32 def store(trace) persisted_trace = Helpers::Trace.normalize_trace_affect(trace) persisted_trace[:partition_id] ||= @partition_id @mutex.synchronize do @dirty_trace_ids << persisted_trace[:trace_id] @traces[persisted_trace[:trace_id]] = persisted_trace end persisted_trace[:trace_id] end |
#synchronize ⇒ Object
115 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 115 def synchronize(&) = @mutex.synchronize(&) |
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 144 def walk_associations(start_id:, max_hops: 12, min_strength: 0.1) snapshot = @mutex.synchronize { @traces.dup } return [] unless snapshot.key?(start_id) results = [] visited = Set.new([start_id]) queue = [[start_id, 0, [start_id]]] until queue.empty? current_id, depth, path = queue.shift next unless (current = snapshot[current_id]) current[:associated_traces].each do |neighbor_id| next if visited.include?(neighbor_id) neighbor = snapshot[neighbor_id] next unless neighbor next unless neighbor[:strength] >= min_strength visited << neighbor_id neighbor_path = path + [neighbor_id] results << { trace_id: neighbor_id, depth: depth + 1, path: neighbor_path } queue << [neighbor_id, depth + 1, neighbor_path] if depth + 1 < max_hops end end results end |