Class: Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Inherits:
-
Object
- Object
- Legion::Extensions::Agentic::Memory::Trace::Helpers::Store
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/extensions/agentic/memory/trace/helpers/store.rb
Overview
In-memory store for development and testing. Production deployments should use a PostgreSQL + Redis backed store.
Constant Summary collapse
- ASSOCIATION_LOAD_BATCH_SIZE =
500
Instance Attribute Summary collapse
-
#associations ⇒ Object
readonly
Returns the value of attribute associations.
-
#traces ⇒ Object
readonly
Returns the value of attribute traces.
Instance Method Summary collapse
- #all_traces(min_strength: 0.0) ⇒ Object
- #clear_dirty_flags(trace_rows_snapshot) ⇒ Object
- #count ⇒ Object
- #delete(trace_id) ⇒ Object
- #firmware_traces ⇒ Object
- #flush ⇒ Object
- #get(trace_id) ⇒ Object
-
#initialize(partition_id: nil) ⇒ Store
constructor
A new instance of Store.
- #load_from_local ⇒ Object
- #persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
- #persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) ⇒ Object
- #record_coactivation(trace_id_a, trace_id_b) ⇒ Object
- #restore_traces(traces) ⇒ Object
- #retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
- #retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
- #retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
- #save_to_local ⇒ Object
- #snapshot_dirty_state ⇒ Object
- #store(trace) ⇒ Object
- #synchronize ⇒ Object
- #walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
Constructor Details
#initialize(partition_id: nil) ⇒ Store
Returns a new instance of Store.
20 21 22 23 24 25 26 27 28 29 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 20 def initialize(partition_id: nil) @mutex = Mutex.new @traces = {} @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @partition_id = partition_id || resolve_partition_id @traces_dirty = false @associations_dirty = false @persisted_trace_rows = {} load_from_local end |
Instance Attribute Details
#associations ⇒ Object (readonly)
Returns the value of attribute associations.
18 19 20 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 18 def associations @associations end |
#traces ⇒ Object (readonly)
Returns the value of attribute traces.
18 19 20 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 18 def traces @traces end |
Instance Method Details
#all_traces(min_strength: 0.0) ⇒ Object
97 98 99 100 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 97 def all_traces(min_strength: 0.0) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:strength] >= min_strength } end |
#clear_dirty_flags(trace_rows_snapshot) ⇒ Object
230 231 232 233 234 235 236 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 230 def clear_dirty_flags(trace_rows_snapshot) @mutex.synchronize do @traces_dirty = false @associations_dirty = false @persisted_trace_rows = trace_rows_snapshot end end |
#count ⇒ Object
102 103 104 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 102 def count @mutex.synchronize { @traces.size } end |
#delete(trace_id) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 45 def delete(trace_id) @mutex.synchronize do removed_trace = @traces.delete(trace_id) @traces.each_value { |trace| trace[:associated_traces]&.delete(trace_id) } removed_links = @associations.delete(trace_id) @associations.each_value { |links| links.delete(trace_id) } @traces_dirty = true if removed_trace @associations_dirty = true if removed_trace || removed_links end end |
#firmware_traces ⇒ Object
108 109 110 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 108 def firmware_traces retrieve_by_type(:firmware) end |
#flush ⇒ Object
112 113 114 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 112 def flush save_to_local end |
#get(trace_id) ⇒ Object
41 42 43 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 41 def get(trace_id) @mutex.synchronize { @traces[trace_id] } end |
#load_from_local ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 238 def load_from_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection db[:memory_traces].where(partition_id: @partition_id).each do |row| @traces[row[:trace_id]] = deserialize_trace_from_db(row) end load_local_associations(db) @persisted_trace_rows = @traces.transform_values { |trace| serialize_trace_for_db(trace) } @traces_dirty = false @associations_dirty = false end |
#persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 206 def persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, dirty) assoc_scope_ids = (scoped_trace_ids + memory_trace_ids).uniq return unless (dirty || !stale_ids.empty?) && !assoc_scope_ids.empty? association_columns = db[:memory_associations].columns partitioned_associations = association_columns.include?(:partition_id) if partitioned_associations db[:memory_associations].where(partition_id: @partition_id).delete else assoc_scope_ids.each_slice(ASSOCIATION_LOAD_BATCH_SIZE) do |ids| db[:memory_associations].where(trace_id_a: ids).delete db[:memory_associations].where(trace_id_b: ids).delete end end associations_snapshot.each do |id_a, targets| targets.each do |id_b, count| row = { trace_id_a: id_a, trace_id_b: id_b, coactivation_count: count } row[:partition_id] = @partition_id if partitioned_associations db[:memory_associations].insert(row) end end end |
#persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) ⇒ Object
196 197 198 199 200 201 202 203 204 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 196 def persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) return unless trace_changes[:dirty] || !stale_ids.empty? ds = db[:memory_traces] trace_changes[:changed_ids].each do |trace_id| ds.insert_conflict(:replace).insert(trace_rows_snapshot.fetch(trace_id)) end db[:memory_traces].where(trace_id: stale_ids).delete unless stale_ids.empty? end |
#record_coactivation(trace_id_a, trace_id_b) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 83 def record_coactivation(trace_id_a, trace_id_b) return if trace_id_a == trace_id_b @mutex.synchronize do @associations[trace_id_a][trace_id_b] += 1 @associations[trace_id_b][trace_id_a] += 1 @associations_dirty = true threshold = Helpers::Trace::COACTIVATION_THRESHOLD @traces_dirty = true if @associations[trace_id_a][trace_id_b] >= threshold && link_traces(trace_id_a, trace_id_b) end end |
#restore_traces(traces) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 116 def restore_traces(traces) snapshot = Array(traces).each_with_object({}) do |trace, memo| next unless trace.is_a?(Hash) && trace[:trace_id] restored = trace.dup restored[:partition_id] ||= @partition_id memo[restored[:trace_id]] = restored end @mutex.synchronize do @traces = snapshot @associations = Hash.new { |h, k| h[k] = Hash.new(0) } @traces_dirty = true @associations_dirty = true end flush end |
#retrieve_associated(trace_id, min_strength: 0.0, limit: 20) ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 72 def retrieve_associated(trace_id, min_strength: 0.0, limit: 20) trace = @traces[trace_id] return [] unless trace trace[:associated_traces] .filter_map { |id| @traces[id] } .select { |t| t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) ⇒ Object
65 66 67 68 69 70 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 65 def retrieve_by_domain(domain_tag, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:domain_tags].include?(domain_tag) && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#retrieve_by_type(type, min_strength: 0.0, limit: 100) ⇒ Object
58 59 60 61 62 63 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 58 def retrieve_by_type(type, min_strength: 0.0, limit: 100) snapshot = @mutex.synchronize { @traces.values } snapshot.select { |t| t[:trace_type] == type && t[:strength] >= min_strength } .sort_by { |t| -t[:strength] } .first(limit) end |
#save_to_local ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 163 def save_to_local return unless defined?(Legion::Data::Local) && Legion::Data::Local.connected? return unless Legion::Data::Local.connection.table_exists?(:memory_traces) db = Legion::Data::Local.connection snapshots = snapshot_dirty_state return unless snapshots traces_snapshot, associations_snapshot, trace_rows_snapshot, trace_changes, associations_dirty = snapshots db.transaction do scoped_trace_ids = db[:memory_traces].where(partition_id: @partition_id).select_map(:trace_id) memory_trace_ids = traces_snapshot.keys stale_ids = scoped_trace_ids - memory_trace_ids persist_dirty_traces(db, trace_rows_snapshot, trace_changes, stale_ids) persist_dirty_associations(db, associations_snapshot, scoped_trace_ids, memory_trace_ids, stale_ids, associations_dirty) end clear_dirty_flags(trace_rows_snapshot) end |
#snapshot_dirty_state ⇒ Object
182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 182 def snapshot_dirty_state traces_snapshot, associations_snapshot, trace_rows_snapshot, trace_changes, associations_dirty = @mutex.synchronize do ts = @traces.transform_values(&:dup) as = @associations.each_with_object({}) { |(tid, targets), memo| memo[tid] = targets.dup } trs = ts.transform_values { |trace| serialize_trace_for_db(trace) } changed_trace_ids = trs.each_key.reject { |trace_id| trs[trace_id] == @persisted_trace_rows[trace_id] } trace_changes = { dirty: @traces_dirty || changed_trace_ids.any?, changed_ids: changed_trace_ids } [ts, as, trs, trace_changes, @associations_dirty] end return nil unless trace_changes[:dirty] || associations_dirty [traces_snapshot, associations_snapshot, trace_rows_snapshot, trace_changes, associations_dirty] end |
#store(trace) ⇒ Object
31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 31 def store(trace) persisted_trace = trace.dup persisted_trace[:partition_id] ||= @partition_id @mutex.synchronize do @traces_dirty = true if @traces[persisted_trace[:trace_id]] != persisted_trace @traces[persisted_trace[:trace_id]] = persisted_trace end persisted_trace[:trace_id] end |
#synchronize ⇒ Object
106 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 106 def synchronize(&) = @mutex.synchronize(&) |
#walk_associations(start_id:, max_hops: 12, min_strength: 0.1) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/legion/extensions/agentic/memory/trace/helpers/store.rb', line 134 def walk_associations(start_id:, max_hops: 12, min_strength: 0.1) snapshot = @mutex.synchronize { @traces.dup } return [] unless snapshot.key?(start_id) results = [] visited = Set.new([start_id]) queue = [[start_id, 0, [start_id]]] until queue.empty? current_id, depth, path = queue.shift next unless (current = snapshot[current_id]) current[:associated_traces].each do |neighbor_id| next if visited.include?(neighbor_id) neighbor = snapshot[neighbor_id] next unless neighbor next unless neighbor[:strength] >= min_strength visited << neighbor_id neighbor_path = path + [neighbor_id] results << { trace_id: neighbor_id, depth: depth + 1, path: neighbor_path } queue << [neighbor_id, depth + 1, neighbor_path] if depth + 1 < max_hops end end results end |