Class: SwarmSDK::V3::Memory::IngestionPipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/swarm_sdk/v3/memory/ingestion_pipeline.rb

Overview

Async post-turn processing pipeline

After each conversation turn, this pipeline:

  1. Segments the turn into atomic ideas (LLM when available, heuristic fallback)

  2. Creates memory cards (<=250 words each)

  3. Extracts entities

  4. Creates graph edges between related cards (within turn and cross-turn)

  5. Assigns cards to clusters (updating decision logs for decision cards)

  6. Generates embeddings

Examples:

pipeline = IngestionPipeline.new(adapter: adapter, embedder: embedder)
pipeline.ingest(turn_text: "The API uses JWT...", turn_id: "turn_001")

Instance Method Summary collapse

Constructor Details

#initialize(adapter:, embedder:, chat: nil) ⇒ IngestionPipeline

Returns a new instance of IngestionPipeline.

Parameters:

  • adapter (Adapters::Base)

    Storage adapter

  • embedder (Embedder)

    Text embedder

  • chat (RubyLLM::Chat, nil) (defaults to: nil)

    LLM for segmentation (nil = simple splitting)



23
24
25
26
27
28
# File 'lib/swarm_sdk/v3/memory/ingestion_pipeline.rb', line 23

def initialize(adapter:, embedder:, chat: nil)
  @adapter = adapter
  @embedder = embedder
  @chat = chat
  @config = Configuration.instance
end

Instance Method Details

#ingest(turn_text:, turn_id:) ⇒ Array<Card>

Ingest a conversation turn into memory cards

Parameters:

  • turn_text (String)

    Full turn text (user + assistant + tool calls)

  • turn_id (String)

    Unique turn identifier

Returns:

  • (Array<Card>)

    Created cards



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/swarm_sdk/v3/memory/ingestion_pipeline.rb', line 35

def ingest(turn_text:, turn_id:)
  DebugLog.log("ingestion", "ingest: turn_id=#{turn_id}, text_len=#{turn_text.size}")

  segments = DebugLog.time("ingestion", "segment_turn") do
    segment_turn(turn_text)
  end
  DebugLog.log("ingestion", "segments=#{segments.size}")

  cards = DebugLog.time("ingestion", "create_cards(#{segments.size})") do
    segments.map { |segment| create_card(text: segment, turn_id: turn_id) }
  end

  DebugLog.time("ingestion", "create_turn_edges") { create_turn_edges(cards) }
  DebugLog.time("ingestion", "cross_turn_entity_edges") { create_cross_turn_entity_edges(cards) }
  DebugLog.time("ingestion", "assign_to_clusters") { assign_to_clusters(cards) }

  DebugLog.log("ingestion", "ingest complete: #{cards.size} cards created")
  cards
end