Class: Pikuri::VectorDb::Backend::Qdrant

Inherits:
Object
  • Object
show all
Defined in:
lib/pikuri/vector_db/backend/qdrant.rb

Overview

Thin Faraday HTTP client against a self-hosted Qdrant server (1.x REST API). The recommended persistent backend (see pikuri-vectordb/DESIGN.md for the Chroma-vs-Qdrant survey behind that call), behind the same duck-typed Pikuri::VectorDb::Backend protocol as InMemory and Chroma: same method names, same return shapes, same ArgumentError contract on empty input + non-positive top_k.

Hand-rolled rather than a dependency on a qdrant-ruby gem, for the same reasons as Chroma: a handful of endpoints, Faraday already in the closure, the wire protocol auditable in one readable file. Qdrant’s REST surface has been stable across the 1.x line, so the track-it-by-hand cost is lower than Chroma’s.

Two ways to get one

  • **Bring your own.** Backend::Qdrant.new(host:, port:, collection:) against an existing Qdrant deployment.

  • **Let pikuri manage it.** Server::Qdrant.ensure_running supervises a container under the pikuri-internal-qdrant name; its #client(collection:) returns a Backend::Qdrant pre-pointed at it. Same server-vs-client split as Server::Chroma / Chroma.

Qdrant REST API

Endpoints used (all under /collections/{name}):

  • GET …/exists — existence probe without creation.

  • PUT /collections/{name} — create, with {vectors: {size:, distance: ‘Cosine’}}. A 409 means “already exists” and is treated as success.

  • PUT …/points?wait=true — insert-or-replace by point id. wait=true so the write is visible to the next read (the Indexer upserts, then Tools::Search queries).

  • POST …/points/search — k-NN search.

  • POST …/points/count — exact count, optionally payload-filtered (used by #count and #source_indexed?).

  • POST …/points/delete?wait=true — payload-filtered delete; used by #delete_by_source.

  • POST …/points/scroll — cursor-paged payload read; used by #sources_with_hashes.

  • DELETE /collections/{name} — drop the collection (used by #delete_all).

Point ids: chunk id → derived UUID

Qdrant point ids must be unsigned integers or UUIDs —never arbitrary strings — so the Chunk‘s readable “source:offset” id can’t be the point id directly. Each point id is derived deterministically as the first 16 bytes of SHA1(chunk.id) formatted as a UUID: deterministic, so re-upserting the same chunk id replaces the old point (the insert-or-replace contract); collision probability is negligible at any corpus size pikuri will see. The real chunk id rides in the payload under the reserved id key and is restored on the way out, so callers never see the UUID.

Payload shape

Qdrant has no separate document store — everything beyond the vector lives in the point’s payload. Three reserved keys carry the Chunk‘s fixed fields (id, source, text); the chunk’s metadata Hash is merged alongside with keys stringified for JSON round-trip stability, and symbolized back on the way out — same normalization as Chroma, so a chunk pulled from a query looks identical across backends.

Cosine similarity (no conversion)

The collection is created with distance: ‘Cosine’. Unlike Chroma (which returns cosine distance, converted via 1 - d), Qdrant’s score already is a cosine similarity — higher = better — so it passes through to Result untouched. (This same convention is why mem0’s ranker is correct on Qdrant and inverted on pgvector — pikuri-memory/DESIGN.md §“Root cause”.)

Vector-dim contract diverges from InMemory

Same divergence as Chroma, with one wrinkle: Qdrant fixes the dimension at *collection creation*, so #upsert creates the collection with the first batch’s vector size. Mismatched later upserts or queries produce HTTP 4xx, which propagates as RuntimeError — different exception class than InMemory‘s ArgumentError, same loud-failure shape.

No payload indexes (deliberate)

Qdrant can index payload fields (+PUT …/index+) to speed filtered operations. Every filtered call here (delete_by_source, scroll, filtered count) is an exact scan that works unindexed, and at pikuri’s corpus scale (10³–10⁵ chunks) the scan is milliseconds — so no index calls, keeping the client smaller. Add them if a corpus ever outgrows that.

Testing posture

Same as Chroma: Faraday::Adapter::Test stubs verify the request shapes against the 1.x REST API but don’t catch real-Qdrant protocol drift. Targets Qdrant 1.8+ (the /exists probe is the newest endpoint used).

Constant Summary collapse

MANIFEST_PAGE_SIZE =

Rows per /scroll page in #sources_with_hashes. Same rationale as Chroma::MANIFEST_PAGE_SIZE: caps the JSON burst of the boot manifest read; one round trip per this-many files.

1_000
RESERVED_PAYLOAD_KEYS =

Payload keys reserved for the Chunk‘s fixed fields. User metadata with these keys would collide; the Indexer never sets them.

%w[id source text].freeze

Instance Method Summary collapse

Constructor Details

#initialize(host:, port:, collection:, connection: nil) ⇒ Qdrant

Parameters:

  • host (String)
  • port (Integer)
  • collection (String)

    collection name in Qdrant. Engine-specific identifier, so it lives here rather than on VectorDb::Extension — same placement as Chroma‘s.

  • connection (Faraday::Connection, nil) (defaults to: nil)

    optional dependency-injection point for tests.

Raises:

  • (ArgumentError)

    on empty host or empty collection.



142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 142

def initialize(host:, port:, collection:, connection: nil)
  raise ArgumentError, 'host must be non-empty' if host.nil? || host.to_s.empty?
  raise ArgumentError, 'collection must be non-empty' if collection.nil? || collection.to_s.empty?

  @host = host
  @port = port
  @collection = collection
  @known_exists = false
  @connection = connection || Faraday.new(url: "http://#{host}:#{port}") do |f|
    f.request :json
    f.response :json
    f.adapter Faraday.default_adapter
  end
end

Instance Method Details

#countInteger

Returns current chunk count (exact). Zero before the first #upsert.

Returns:

  • (Integer)

    current chunk count (exact). Zero before the first #upsert.

Raises:

  • (RuntimeError)

    on HTTP failure.



253
254
255
256
257
258
259
260
261
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 253

def count
  return 0 unless collection_exists?

  body = request_json(:post, "#{collection_path}/points/count", { exact: true })
  n = body.is_a?(Hash) ? body.dig('result', 'count') : nil
  raise "Backend::Qdrant: count response missing result.count (got #{body.inspect})" unless n.is_a?(Integer)

  n
end

#delete_allvoid

This method returns an undefined value.

Drop the collection. Next #upsert re-creates from scratch (with that batch’s vector dim) — the v1 nuke-and-reload reindex path. No-op if no collection was ever created; 404 on the DELETE is treated as “already gone” — idempotent.

Raises:

  • (RuntimeError)

    on unexpected HTTP failure.



238
239
240
241
242
243
244
245
246
247
248
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 238

def delete_all
  return nil unless collection_exists?

  response = @connection.delete(collection_path)
  unless [200, 404].include?(response.status)
    raise "Backend::Qdrant: DELETE #{collection_path} returned " \
          "HTTP #{response.status}: #{response.body.inspect}"
  end
  @known_exists = false
  nil
end

#delete_by_source(source) ⇒ void

This method returns an undefined value.

Remove every chunk whose source matches, via a payload-filtered points/delete. The scoped counterpart to #delete_all. No-op when the collection doesn’t exist yet.

Parameters:

Raises:

  • (RuntimeError)

    on HTTP failure.



271
272
273
274
275
276
277
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 271

def delete_by_source(source)
  return nil unless collection_exists?

  request_json(:post, "#{collection_path}/points/delete?wait=true",
               { filter: source_filter(source) })
  nil
end

#query(vector:, top_k:) ⇒ Array<Backend::Result>

k-NN query by cosine similarity. Returns at most top_k Results descending by score. Qdrant’s score is already a cosine similarity (higher = better), so it passes through unconverted — same scale as InMemory.

Parameters:

  • vector (Array<Float>)
  • top_k (Integer)

Returns:

Raises:

  • (ArgumentError)

    on non-positive top_k.

  • (RuntimeError)

    on HTTP failure.



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 199

def query(vector:, top_k:)
  raise ArgumentError, "top_k must be positive (got #{top_k})" if top_k <= 0

  # Never upserted → no collection → semantic answer is
  # "no hits" (same short-circuit as Chroma).
  return [] unless collection_exists?

  body = request_json(:post, "#{collection_path}/points/search", {
                        vector: vector,
                        limit: top_k,
                        with_payload: true
                      })

  hits = body['result'] || []
  hits.map do |hit|
    payload = hit['payload'] || {}
    chunk_meta = {}
    payload.each do |k, v|
      next if RESERVED_PAYLOAD_KEYS.include?(k)

      chunk_meta[k.to_sym] = v
    end

    chunk = Chunk.new(
      id: payload['id'] || '', source: payload['source'] || '',
      text: payload['text'] || '', metadata: chunk_meta
    )
    Result.new(chunk: chunk, score: hit['score'].to_f)
  end
end

#replace_source(source:, chunks:, vectors:) ⇒ void

This method returns an undefined value.

Replace all chunks for one source: delete the old set, then upsert the new one. The incremental-reindex unit. Two HTTP calls, so not transactional — same divergence from InMemory, same mitigation, as documented on Chroma#replace_source.

Parameters:

  • source (String)

    the Chunk#source being replaced.

  • chunks (Array<Chunk>)

    the new chunk set.

  • vectors (Array<Array<Float>>)

    parallel to chunks.

Raises:

  • (ArgumentError)

    on empty input or length mismatch.

  • (RuntimeError)

    on HTTP failure.



291
292
293
294
295
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 291

def replace_source(source:, chunks:, vectors:)
  delete_by_source(source)
  upsert(chunks: chunks, vectors: vectors)
  nil
end

#source_indexed?(source) ⇒ Boolean

Is source in the corpus? A payload-filtered exact count — O(1) transport regardless of corpus size, never the full #sources_with_hashes manifest. See the Backend protocol yardoc.

Parameters:

Returns:

  • (Boolean)

    true if at least one chunk has this source.

Raises:

  • (RuntimeError)

    on HTTP failure.



348
349
350
351
352
353
354
355
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 348

def source_indexed?(source)
  return false unless collection_exists?

  body = request_json(:post, "#{collection_path}/points/count",
                      { filter: source_filter(source), exact: true })
  n = body.is_a?(Hash) ? body.dig('result', 'count') : nil
  n.is_a?(Integer) && n.positive?
end

#sources_with_hashesHash{String => String, nil}

The boot-sweep reference: source → stored content hash for every indexed document. Reads one payload row per file (filter offset == 0 — every file has exactly one chunk at offset 0), projected to the two payload keys needed, vectors excluded, paged by Qdrant’s own scroll cursor (next_page_offset — no client-side offset arithmetic, unlike Chroma).

Pagination assumes the manifest isn’t mutating mid-read; the Watcher drives this from its single worker thread.

Returns:

  • (Hash{String => String, nil})

    source → content hash. Empty when the collection doesn’t exist yet.

Raises:

  • (RuntimeError)

    on HTTP failure.



311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 311

def sources_with_hashes
  return {} unless collection_exists?

  result = {}
  cursor = nil
  loop do
    request = {
      filter: { must: [{ key: 'offset', match: { value: 0 } }] },
      with_payload: %w[source hash],
      with_vector: false,
      limit: MANIFEST_PAGE_SIZE
    }
    request[:offset] = cursor unless cursor.nil?

    body = request_json(:post, "#{collection_path}/points/scroll", request)
    points = body.is_a?(Hash) ? (body.dig('result', 'points') || []) : []
    points.each do |point|
      payload = point['payload']
      next unless payload.is_a?(Hash) && payload['source']

      result[payload['source']] = payload['hash']
    end

    cursor = body.is_a?(Hash) ? body.dig('result', 'next_page_offset') : nil
    break if cursor.nil?
  end
  result
end

#upsert(chunks:, vectors:) ⇒ void

This method returns an undefined value.

Insert-or-replace by chunk.id (via the derived point UUID — see the class header). Parallel arrays of equal length; raises on empty input or length mismatch (same contract as InMemory). Creates the collection on first use, fixing the vector dim to this batch’s; mismatched dims later surface as RuntimeError from a 4xx response.

Parameters:

  • chunks (Array<Chunk>)
  • vectors (Array<Array<Float>>)

Raises:

  • (ArgumentError)

    on empty input or length mismatch.

  • (RuntimeError)

    on HTTP failure.



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/pikuri/vector_db/backend/qdrant.rb', line 170

def upsert(chunks:, vectors:)
  raise ArgumentError, 'upsert called with empty chunks/vectors' if chunks.empty?
  if chunks.size != vectors.size
    raise ArgumentError, "size mismatch: #{chunks.size} chunks vs #{vectors.size} vectors"
  end

  ensure_collection!(dim: vectors.first.size)

  points = chunks.each_with_index.map do |c, i|
    payload = { 'id' => c.id, 'source' => c.source, 'text' => c.text }
    c..each { |k, v| payload[k.to_s] = v }
    { id: point_id(c.id), vector: vectors[i], payload: payload }
  end

  request_json(:put, "#{collection_path}/points?wait=true", { points: points })
  nil
end