Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Inherits:
-
Object
- Object
- Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/extensions/agentic/memory/trace/helpers/store.rb
Overview
In-memory store for development and testing. Production deployments should use a PostgreSQL + Redis backed store.
Constant Summary collapse
- ASSOCIATION_LOAD_BATCH_SIZE =
500
Instance Attribute Summary collapse
-
#associations ⇒ Object
readonly
Returns the value of attribute associations.
-
#traces ⇒ Object
readonly
Returns the value of attribute traces.
Instance Method Summary collapse
- #all_traces(min_strength: 0.0) ⇒ Object
- #clear_dirty_flags(trace_rows_snapshot) ⇒ Object
- #count ⇒ Object
- #delete(trace_id) ⇒ Object
- #firmware_traces ⇒ Object
- #flush ⇒ Object
- #get(trace_id) ⇒ Object
-
#initialize(partition_id: nil) ⇒ Store
constructor
A new instance of Store.
- #load_from_local ⇒ Object
- #persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
- #persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) ⇒ Object
- #record_coactivation(trace_id_a, trace_id_b) ⇒ Object
- #restore_traces(traces) ⇒ Object
- #retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
- #retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
- #retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
- #save_to_local ⇒ Object
- #snapshot_dirty_state ⇒ Object
- #store(trace) ⇒ Object
- #synchronize ⇒ Object
- #walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
Constructor Details
#initialize(partition_id: nil) ⇒ Store
Returns a new instance of Store.
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 20 def initialize(partition_id: nil) @mutex = Mutex.new @traces = {} @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @partition_id = partition_id || resolve_partition_id @traces_dirty = false @associations_dirty = false @persisted_trace_rows = {} load_from_local end |
Instance Attribute Details
#associations ⇒ Object (readonly)
Returns the value of attribute associations.
18 19 20 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 18 def associations @associations end |
#traces ⇒ Object (readonly)
Returns the value of attribute traces.
18 19 20 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 18 def traces @traces end |
Instance Method Details
#all_traces(min_strength: 0.0) ⇒ Object
100 101 102 103 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 100 def all_traces(min_strength: 0.0) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:strength] >= min_strength } end |
#clear_dirty_flags(trace_rows_snapshot) ⇒ Object
233 234 235 236 237 238 239 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 233 def clear_dirty_flags(trace_rows_snapshot) @mutex.synchronize do @traces_dirty = false @associations_dirty = false @persisted_trace_rows = trace_rows_snapshot end end |
#count ⇒ Object
105 106 107 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 105 def count @mutex.synchronize { @traces.size } end |
#delete(trace_id) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 45 def delete(trace_id) @mutex.synchronize do removed_trace = @traces.delete(trace_id) @traces.each_value { |trace| trace[:associated_traces]&.delete(trace_id) } removed_links = @associations.delete(trace_id) @associations.each_value { |links| links.delete(trace_id) } @traces_dirty = true if removed_trace @associations_dirty = true if removed_trace || removed_links end end |
#firmware_traces ⇒ Object
111 112 113 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 111 def firmware_traces retrieve_by_type(:firmware) end |
#flush ⇒ Object
115 116 117 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 115 def flush save_to_local end |
#get(trace_id) ⇒ Object
41 42 43 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 41 def get(trace_id) @mutex.synchronize { @traces[trace_id] } end |
#load_from_local ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 241 def load_from_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection db[:memory_traces].where(partition_id: @partition_id).each do |row| @traces[row[:trace_id]] = deserialize_trace_from_db(row) end load_local_associations(db) @persisted_trace_rows = @traces.transform_values { |trace| serialize_trace_for_db(trace) } @traces_dirty = false @associations_dirty = false end |
#persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 209 def persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) assoc_scope_ids = (scoped_trace_ids + memory_trace_ids).uniq return unless (dirty || !stale_ids.empty?) && !assoc_scope_ids.empty? association_columns = db[:memory_associations].columns partitioned_associations = association_columns.include?(:partition_id) if partitioned_associations db[:memory_associations].where(partition_id: @partition_id).delete else assoc_scope_ids.each_slice(ASSOCIATION_LOAD_BATCH_SIZE) do |ids| db[:memory_associations].where(trace_id_a: ids).delete db[:memory_associations].where(trace_id_b: ids).delete end end associations_snapshot.each do |id_a, targets| targets.each do |id_b, count| row = { trace_id_a: id_a, trace_id_b: id_b, coactivation_count: count } row[:partition_id] = @partition_id if partitioned_associations db[:memory_associations].insert(row) end end end |
#persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) ⇒ Object
199 200 201 202 203 204 205 206 207 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 199 def persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) return unless trace_changes[:dirty] || !stale_ids.empty? ds = db[:memory_traces] trace_changes[:changed_ids].each do |trace_id| ds.insert_conflict(:replace).insert(trace_rows_snapshot.fetch(trace_id)) end db[:memory_traces].where(trace_id: stale_ids).delete unless stale_ids.empty? end |
#record_coactivation(trace_id_a, trace_id_b) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 86 def record_coactivation(trace_id_a, trace_id_b) return if trace_id_a == trace_id_b @mutex.synchronize do @associations[trace_id_a][trace_id_b] += 1 @associations[trace_id_b][trace_id_a] += 1 @associations_dirty = true threshold = Helpers::Trace::COACTIVATION_THRESHOLD @traces_dirty = true if @associations[trace_id_a][trace_id_b] >= threshold && link_traces(trace_id_a, trace_id_b) end end |
#restore_traces(traces) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 119 def restore_traces(traces) snapshot = Array(traces).each_with_object({}) do |trace, memo| next unless trace.is_a?(Hash) && trace[:trace_id] restored = trace.dup restored[:partition_id] ||= @partition_id memo[restored[:trace_id]] = restored end @mutex.synchronize do @traces = snapshot @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @traces_dirty = true @associations_dirty = true end flush end |
#retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 72 def retrieve_associated(trace_id, min_strength: 0.0, limit: 20) associated = @mutex.synchronize do trace = @traces[trace_id] next [] unless trace trace[:associated_traces].filter_map { |id| @traces[id]&.dup } end associated .select { |t| t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
65 66 67 68 69 70 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 65 def retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:domain_tags].include?(domain_tag) && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
58 59 60 61 62 63 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 58 def retrieve_by_type(type, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:trace_type] == type && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#save_to_local ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 166 def save_to_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection snapshots = snapshot_dirty_state return unless snapshots traces_snapshot, associations_snapshot, trace_rows_snapshot, trace_changes, associations_dirty = snapshots db.transaction do scoped_trace_ids = db[:memory_traces].where(partition_id: @partition_id).select_map(:trace_id) memory_trace_ids = traces_snapshot.keys stale_ids = scoped_trace_ids - memory_trace_ids persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, associations_dirty) end clear_dirty_flags(trace_rows_snapshot) end |
#snapshot_dirty_state ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 185 def snapshot_dirty_state traces_snapshot, associations_snapshot, trace_rows_snapshot, trace_changes, associations_dirty = @mutex.synchronize do ts = @traces.transform_values(&:dup) as = @associations.each_with_object({}) { |(tid, targets), memo| memo[tid] = targets.dup } trs = ts.transform_values { |trace| serialize_trace_for_db(trace) } changed_trace_ids = trs.each_key.reject { |trace_id| trs[trace_id] == @persisted_trace_rows[trace_id] } trace_changes = { dirty: @traces_dirty || changed_trace_ids.any?, changed_ids: changed_trace_ids } [ts, as, trs, trace_changes, @associations_dirty] end return nil unless trace_changes[:dirty] || associations_dirty [traces_snapshot, associations_snapshot, trace_rows_snapshot, trace_changes, associations_dirty] end |
#store(trace) ⇒ Object
31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 31 def store(trace) persisted_trace = trace.dup persisted_trace[:partition_id] ||= @partition_id @mutex.synchronize do @traces_dirty = true if @traces[persisted_trace[:trace_id]] != persisted_trace @traces[persisted_trace[:trace_id]] = persisted_trace end persisted_trace[:trace_id] end |
#synchronize ⇒ Object
109 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 109 def synchronize(&) = @mutex.synchronize(&) |
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 137 def walk_associations(start_id:, max_hops: 12, min_strength: 0.1) snapshot = @mutex.synchronize { @traces.dup } return [] unless snapshot.key?(start_id) results = [] visited = Set.new([start_id]) queue = [[start_id, 0, [start_id]]] until queue.empty? current_id, depth, path = queue.shift next unless (current = snapshot[current_id]) current[:associated_traces].each do |neighbor_id| next if visited.include?(neighbor_id) neighbor = snapshot[neighbor_id] next unless neighbor next unless neighbor[:strength] >= min_strength visited << neighbor_id neighbor_path = path + [neighbor_id] results << { trace_id: neighbor_id, depth: depth + 1, path: neighbor_path } queue << [neighbor_id, depth + 1, neighbor_path] if depth + 1 < max_hops end end results end |