Class: SwarmSDK::V3::Memory::Consolidator

Inherits:
Object
  • Object
show all
Defined in:
lib/swarm_sdk/v3/memory/consolidator.rb

Overview

Periodic maintenance for the memory system

Performs:

  • Deduplication and canonicalization of similar cards

  • Cluster summary updates

  • Conflict detection between contradicting cards

  • Merging of redundant cards

Examples:

consolidator = Consolidator.new(adapter: adapter, embedder: embedder)
consolidator.run

Constant Summary collapse

SUPPORTING_EDGE_TYPES =

Edge types that indicate a supporting relationship between cards. Pairs connected by these edges should never be flagged as contradictions. Derived from Edge::TYPES so new edge types are non-conflicting by default.

(Edge::TYPES - [:contradicts]).freeze

Instance Method Summary collapse

Constructor Details

#initialize(adapter:, embedder:) ⇒ Consolidator

Returns a new instance of Consolidator.

Parameters:



25
26
27
28
29
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 25

def initialize(adapter:, embedder:)
  @adapter = adapter
  @embedder = embedder
  @config = Configuration.instance
end

Instance Method Details

#deduplicateInteger

Find and merge duplicate cards

Returns:

  • (Integer)

    Number of duplicates merged



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 76

def deduplicate
  cards = @adapter.list_cards
  merged_count = 0
  seen = Set.new

  cards.each do |card|
    next if seen.include?(card.id)
    next unless card.embedding

    # Find similar cards via vector search
    similar = @adapter.vector_search(card.embedding, top_k: 5, threshold: @config.consolidator_dedup_threshold)

    similar.each do |result|
      next if result[:id] == card.id
      next if seen.include?(result[:id])

      duplicate = @adapter.read_card(result[:id])
      next unless duplicate

      merge_cards(canonical: card, duplicate: duplicate)
      seen.add(duplicate.id)
      merged_count += 1
    end

    seen.add(card.id)
  end

  merged_count
end

#deduplicate_with_progress(base_completed, total_items) ⇒ Integer

Find and merge duplicate cards with progress events

Parameters:

  • base_completed (Integer)

    Items completed before this phase

  • total_items (Integer)

    Total items across all phases

Returns:

  • (Integer)

    Number of duplicates merged



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 206

def deduplicate_with_progress(base_completed, total_items)
  cards = @adapter.list_cards
  cards_with_embedding = cards.select(&:embedding)
  merged_count = 0
  seen = Set.new

  cards_with_embedding.each_with_index do |card, index|
    unless seen.include?(card.id)
      # Find similar cards via vector search
      similar = @adapter.vector_search(card.embedding, top_k: 5, threshold: @config.consolidator_dedup_threshold)

      similar.each do |result|
        next if result[:id] == card.id
        next if seen.include?(result[:id])

        duplicate = @adapter.read_card(result[:id])
        next unless duplicate

        merge_cards(canonical: card, duplicate: duplicate)
        seen.add(duplicate.id)
        merged_count += 1
      end

      seen.add(card.id)
    end

    EventStream.emit(
      type: "memory_defrag_progress",
      phase: "consolidate_dedup",
      description: "Finding and merging duplicate memory cards",
      phase_current: index + 1,
      phase_total: cards_with_embedding.size,
      overall_current: base_completed + index + 1,
      overall_total: total_items,
    )
  end

  merged_count
end

#detect_conflictsInteger

Detect contradicting cards and create conflict edges

Finds cards with high text similarity but different content, particularly when both are constraint or decision type cards. Creates ‘contradicts` edges between them.

Returns:

  • (Integer)

    Number of conflicts detected



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 113

def detect_conflicts
  cards = @adapter.list_cards
  conflict_types = [:constraint, :decision, :fact]
  candidates = cards.select { |c| conflict_types.include?(c.type) && c.embedding }
  conflict_count = 0
  seen_pairs = Set.new

  candidates.each do |card|
    # Find similar cards that might contradict
    similar = @adapter.vector_search(card.embedding, top_k: 5, threshold: @config.consolidator_conflict_threshold)

    similar.each do |result|
      next if result[:id] == card.id
      next if result[:similarity] > @config.consolidator_dedup_threshold # Too similar = duplicate, not conflict

      pair_key = [card.id, result[:id]].sort.join(":")
      next if seen_pairs.include?(pair_key)

      other = @adapter.read_card(result[:id])
      next unless other
      next unless conflict_types.include?(other.type)

      # Check if an edge already exists between them
      existing_edges = @adapter.edges_for(card.id)
      other_id = result[:id]

      already_contradicts = existing_edges.any? do |e|
        (e.from_id == other_id || e.to_id == other_id) && e.type == :contradicts
      end
      next if already_contradicts

      # Skip pairs connected by supporting edges — these reinforce
      # the same fact in different phrasings, not contradictions
      has_supporting_edge = existing_edges.any? do |e|
        (e.from_id == other_id || e.to_id == other_id) && SUPPORTING_EDGE_TYPES.include?(e.type)
      end
      next if has_supporting_edge

      # Cards are similar enough to be about the same topic but different
      # enough to potentially contradict — flag them
      edge = Edge.new(
        from_id: card.id,
        to_id: result[:id],
        type: :contradicts,
        weight: result[:similarity],
      )
      @adapter.write_edge(edge)
      seen_pairs.add(pair_key)
      conflict_count += 1
    end
  end

  conflict_count
end

#detect_conflicts_with_progress(base_completed, total_items) ⇒ Integer

Detect contradicting cards with progress events

Parameters:

  • base_completed (Integer)

    Items completed before this phase

  • total_items (Integer)

    Total items across all phases

Returns:

  • (Integer)

    Number of conflicts detected



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 251

def detect_conflicts_with_progress(base_completed, total_items)
  cards = @adapter.list_cards
  conflict_types = [:constraint, :decision, :fact]
  candidates = cards.select { |c| conflict_types.include?(c.type) && c.embedding }
  conflict_count = 0
  seen_pairs = Set.new

  candidates.each_with_index do |card, index|
    # Find similar cards that might contradict
    similar = @adapter.vector_search(card.embedding, top_k: 5, threshold: @config.consolidator_conflict_threshold)

    similar.each do |result|
      next if result[:id] == card.id
      next if result[:similarity] > @config.consolidator_dedup_threshold

      pair_key = [card.id, result[:id]].sort.join(":")
      next if seen_pairs.include?(pair_key)

      other = @adapter.read_card(result[:id])
      next unless other
      next unless conflict_types.include?(other.type)

      existing_edges = @adapter.edges_for(card.id)
      other_id = result[:id]

      already_contradicts = existing_edges.any? do |e|
        (e.from_id == other_id || e.to_id == other_id) && e.type == :contradicts
      end
      next if already_contradicts

      has_supporting_edge = existing_edges.any? do |e|
        (e.from_id == other_id || e.to_id == other_id) && SUPPORTING_EDGE_TYPES.include?(e.type)
      end
      next if has_supporting_edge

      edge = Edge.new(
        from_id: card.id,
        to_id: result[:id],
        type: :contradicts,
        weight: result[:similarity],
      )
      @adapter.write_edge(edge)
      seen_pairs.add(pair_key)
      conflict_count += 1
    end

    EventStream.emit(
      type: "memory_defrag_progress",
      phase: "consolidate_conflicts",
      description: "Detecting contradicting information in memory",
      phase_current: index + 1,
      phase_total: candidates.size,
      overall_current: base_completed + index + 1,
      overall_total: total_items,
    )
  end

  conflict_count
end

#runHash

Run full consolidation

Returns:

  • (Hash)

    Summary of actions taken



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 34

def run
  deduped = deduplicate
  conflicts = detect_conflicts
  clusters_updated = update_cluster_summaries

  {
    duplicates_merged: deduped,
    conflicts_detected: conflicts,
    clusters_updated: clusters_updated,
  }
end

#run_with_progress(base_completed, total_items) ⇒ Hash

Run full consolidation with progress events

Parameters:

  • base_completed (Integer)

    Items completed before this phase

  • total_items (Integer)

    Total items across all phases

Returns:

  • (Hash)

    Summary of actions taken



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 51

def run_with_progress(base_completed, total_items)
  deduped = deduplicate_with_progress(base_completed, total_items)

  # Update base for next sub-phase
  dedup_count = @adapter.list_cards.select(&:embedding).size
  conflicts = detect_conflicts_with_progress(base_completed + dedup_count, total_items)

  # Update base for cluster phase
  conflict_types = [:constraint, :decision, :fact]
  conflict_count = @adapter.list_cards.count { |c| conflict_types.include?(c.type) && c.embedding }
  clusters_updated = update_cluster_summaries_with_progress(
    base_completed + dedup_count + conflict_count,
    total_items,
  )

  {
    duplicates_merged: deduped,
    conflicts_detected: conflicts,
    clusters_updated: clusters_updated,
  }
end

#update_cluster_summariesInteger

Update rolling summaries for all clusters

Returns:

  • (Integer)

    Number of clusters updated



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 171

def update_cluster_summaries
  clusters = @adapter.list_clusters
  updated = 0

  clusters.each do |cluster|
    next if cluster.card_ids.empty?

    # Load cluster cards
    cards = cluster.card_ids.filter_map { |id| @adapter.read_card(id) }
    next if cards.empty?

    # Update key entities from member cards
    all_entities = cards.flat_map(&:entities).tally
    cluster.key_entities = all_entities.sort_by { |_, count| -count }.take(10).map(&:first)

    # Update rolling summary from card texts
    cluster.rolling_summary = cards.map(&:text).join(" | ").slice(0, 500)

    # Update cluster embedding (average of member embeddings)
    embeddings = cards.filter_map(&:embedding)
    cluster.embedding = average_embedding(embeddings) unless embeddings.empty?

    cluster.updated_at = Time.now
    @adapter.write_cluster(cluster)
    updated += 1
  end

  updated
end

#update_cluster_summaries_with_progress(base_completed, total_items) ⇒ Integer

Update cluster summaries with progress events

Parameters:

  • base_completed (Integer)

    Items completed before this phase

  • total_items (Integer)

    Total items across all phases

Returns:

  • (Integer)

    Number of clusters updated



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/swarm_sdk/v3/memory/consolidator.rb', line 316

def update_cluster_summaries_with_progress(base_completed, total_items)
  clusters = @adapter.list_clusters
  updated = 0

  clusters.each_with_index do |cluster, index|
    unless cluster.card_ids.empty?
      cards = cluster.card_ids.filter_map { |id| @adapter.read_card(id) }
      unless cards.empty?
        all_entities = cards.flat_map(&:entities).tally
        cluster.key_entities = all_entities.sort_by { |_, count| -count }.take(10).map(&:first)
        cluster.rolling_summary = cards.map(&:text).join(" | ").slice(0, 500)
        embeddings = cards.filter_map(&:embedding)
        cluster.embedding = average_embedding(embeddings) unless embeddings.empty?
        cluster.updated_at = Time.now
        @adapter.write_cluster(cluster)
        updated += 1
      end
    end

    EventStream.emit(
      type: "memory_defrag_progress",
      phase: "consolidate_clusters",
      description: "Updating cluster summaries and embeddings",
      phase_current: index + 1,
      phase_total: clusters.size,
      overall_current: base_completed + index + 1,
      overall_total: total_items,
    )
  end

  updated
end