Class: Phronomy::Memory::Retrieval::Semantic

Inherits:
Base
  • Object
show all
Defined in:
lib/phronomy/memory/retrieval/semantic.rb

Overview

Retrieval strategy that returns the k semantically closest messages to the query.

Messages are indexed in a VectorStore on save. On retrieval, the query is embedded and the k nearest messages are returned. Falls back to the k most recent messages when no query is provided.

Examples:

retrieval = Phronomy::Memory::Retrieval::Semantic.new(
  embeddings: Phronomy::Embeddings::RubyLLMEmbeddings.new(model: "text-embedding-3-small"),
  k: 10
)

Instance Method Summary collapse

Constructor Details

#initialize(embeddings:, store: nil, k: 10, max_index_size: nil) ⇒ Semantic

Returns a new instance of Semantic.

Parameters:

  • store (Phronomy::VectorStore::Base) (defaults to: nil)

    vector store (default InMemory)

  • embeddings (Phronomy::Embeddings::Base)

    embeddings adapter

  • k (Integer) (defaults to: 10)

    number of messages to retrieve

  • max_index_size (Integer, nil) (defaults to: nil)

    maximum number of entries kept in the local index. When nil, the index grows unboundedly. When exceeded, the oldest entries (by insertion order) are evicted.



24
25
26
27
28
29
30
31
32
33
# File 'lib/phronomy/memory/retrieval/semantic.rb', line 24

def initialize(embeddings:, store: nil, k: 10, max_index_size: nil)
  @store = store || Phronomy::VectorStore::InMemory.new
  @embeddings = embeddings
  @k = k
  @index = {}   # id => message  (insertion-ordered via Ruby Hash)
  @counter = 0
  @max_index_size = max_index_size
  @mutex = Mutex.new
  @indexed_object_ids = {}  # thread_id => { object_id => true }
end

Instance Method Details

#clear_index(thread_id:) ⇒ Object

Clear indexed messages for a thread.

Parameters:

  • thread_id (String)


70
71
72
73
74
75
76
77
78
79
# File 'lib/phronomy/memory/retrieval/semantic.rb', line 70

def clear_index(thread_id:)
  @mutex.synchronize do
    ids = @index.keys.select { |id| id.start_with?("#{thread_id}:") }
    ids.each do |id|
      @index.delete(id)
      @store.remove(id: id)
    end
    @indexed_object_ids.delete(thread_id)
  end
end

#index(thread_id:, messages:) ⇒ Object

Index a new batch of messages so they are searchable on future #select calls. Called by ConversationManager#save.

Messages are deduplicated by object identity: if a message object has already been indexed for the given thread_id, it is skipped (no duplicate embed call).

Parameters:

  • thread_id (String)
  • messages (Array)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/phronomy/memory/retrieval/semantic.rb', line 43

def index(thread_id:, messages:)
  messages.each do |msg|
    # Fast path: skip already-indexed messages without calling embed.
    already_indexed = @mutex.synchronize do
      (@indexed_object_ids[thread_id] ||= {})[msg.object_id]
    end
    next if already_indexed

    embedding = @embeddings.embed(msg.content.to_s)
    @mutex.synchronize do
      # Re-check inside lock to handle concurrent callers for the same thread.
      indexed = (@indexed_object_ids[thread_id] ||= {})
      next if indexed[msg.object_id]

      id = "#{thread_id}:#{@counter}"
      @counter += 1
      @store.add(id: id, embedding: embedding, metadata: {thread_id: thread_id, message: msg})
      @index[id] = msg
      indexed[msg.object_id] = true
      evict_oldest! if @max_index_size && @index.size > @max_index_size
    end
  end
end

#select(messages, query: nil, thread_id: nil) ⇒ Array

Return semantically relevant messages, or recent messages when query is nil.

Parameters:

  • messages (Array)

    full history (used as fallback when query is nil)

  • query (String, nil) (defaults to: nil)

    current user input for semantic search

  • thread_id (String, nil) (defaults to: nil)

    when provided, results are filtered to this thread

Returns:

  • (Array)


87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/phronomy/memory/retrieval/semantic.rb', line 87

def select(messages, query: nil, thread_id: nil)
  if query && !query.strip.empty?
    query_embedding = @embeddings.embed(query)
    results = @mutex.synchronize { @store.search(query_embedding: query_embedding, k: @k * 3) }
    results
      .select { |r| thread_id.nil? || r[:metadata][:thread_id] == thread_id }
      .first(@k)
      .map { |r| r[:metadata][:message] }
  else
    messages.last(@k)
  end
end