Class: Pikuri::VectorDb::Backend::Chroma

Inherits:
Object
  • Object
show all
Defined in:
lib/pikuri/vector_db/backend/chroma.rb

Overview

Thin Faraday HTTP client against a self-hosted Chroma server (v2 API). The persistent backend, behind the same duck-typed Pikuri::VectorDb::Backend protocol as InMemory: same method names, same return shapes, same ArgumentError contract on empty input + non-positive top_k. Where the two diverge is the vector-dim contract — see below.

The client is hand-rolled rather than a dependency on a chroma-db gem: only a handful of v2 endpoints are needed (listed below), Faraday is already in the dependency closure, and a thin first-party client keeps the wire protocol auditable in one readable file — consistent with the read-it-in-an-evening ceiling. The cost is tracking Chroma’s v2 API by hand if it changes.

Two ways to get one

  • **Bring your own.** Backend::Chroma.new(host:, port:, collection:) against an existing chroma deployment (production cluster, docker-compose stack, a chroma already running on the host for an unrelated project). The host owns the process; this class is purely the HTTP client.

  • **Let pikuri manage it.** Server::Chroma.ensure_running spawns and supervises a chroma container under the pikuri-internal-chroma name, against a pinned image, with a bind-mounted volume in the user’s cache dir. Its #client(collection:) returns a Backend::Chroma pre-pointed at the supervised container. The split is deliberate: docker lifecycle and HTTP wire protocol have nothing in common, so each lives in its own class.

Chroma v2 API

Endpoints used:

  • POST /api/v2/tenants/{tenant}/databases/{db}/collections with get_or_create: true — idempotent collection creation. Returns {id, name, …}.

  • POST /api/v2/…/collections/{id}/upsert — insert or replace by id. Body carries parallel arrays of ids, embeddings, documents, metadatas.

  • POST /api/v2/…/collections/{id}/query — k-NN search. Body: {query_embeddings, n_results, include}.

  • GET /api/v2/…/collections/{id}/count — integer count.

  • DELETE /api/v2/…/collections/{id} — drop the collection (used by #delete_all).

  • POST /api/v2/…/collections/{id}/delete — metadata- filtered delete (+where:+); used by #delete_by_source.

  • POST /api/v2/…/collections/{id}/get — fetch rows by {where:} filter with an include: projection; used by #sources_with_hashes.

BYO embeddings (not Chroma’s embedder)

Chroma collections can carry an embedding function in their metadata — Chroma’s term for what pikuri calls an Embedder. When configured, add / query accept raw text via documents / query_texts and Chroma embeds server-side. We deliberately don’t use this: pikuri’s Embedder is the one source of truth for embedder choice, the provider-cliff visibility lives in pikuri’s config, and a parallel Chroma-side embedder config would split the truth without pikuri noticing (e.g. local embedder in pikuri + OpenAIEmbeddingFunction in Chroma — every indexed document silently lands at OpenAI). We always send pre-computed embeddings; Chroma’s collection embedder is never invoked.

Vector-dim contract diverges from InMemory

InMemory enforces vector-dim consistency client-side (locks on first upsert, raises ArgumentError on mismatch). Chroma enforces server-side — first upsert to a collection establishes the dim; mismatched subsequent upserts produce HTTP 4xx which propagates as RuntimeError. Different exception class, same loud-failure shape. Documented divergence; not worth parsing Chroma’s error envelope to coerce to ArgumentError.

Lazy collection resolution

Backend::Chroma.new doesn’t talk to the server. The first #upsert / #query / #count call resolves (and creates if missing) the collection by name, caches the id, and uses it thereafter. #delete_all drops the collection and clears the cached id; the next #upsert re-creates from scratch.

Cosine distance (matches InMemory)

Collection is created with hnsw.space: ‘cosine’. Chroma returns cosine distance (range [0, 2] where 0 = identical, 1 = orthogonal); #query converts to similarity via 1 - distance so the Result score has the same meaning across backends.

Metadata key normalization

Chroma serializes through JSON, so Symbol metadata keys become Strings on round-trip. #upsert converts the incoming Chunk‘s metadata keys to Strings before sending; #query converts them back to Symbols on the way out, so the Chunk a caller pulls from a query looks identical to one stored in InMemory. source rides as a special metadata key (Chroma has no native source concept).

Testing posture

Specs use Faraday::Adapter::Test stubs only — they verify “we send what we think we’re sending” against the v2 API shape but don’t catch real-Chroma protocol drift. Real-Chroma smoke testing is wired into the demo binary in a later phase. Targets Chroma 0.5.x+ (v2 API).

Constant Summary collapse

MANIFEST_PAGE_SIZE =

Rows per /get page in #sources_with_hashes. Caps the JSON burst + parse working set of the boot manifest read on a large corpus; small corpora finish in one page. Chunky but not arbitrary — one round trip per this-many files, and the manifest is one row per file (the offset 0 chunk), so a 50k-file corpus is ~50 localhost round trips instead of one multi-MB response.

1_000

Instance Method Summary collapse

Constructor Details

#initialize(host:, port:, collection:, tenant: 'default_tenant', database: 'default_database', connection: nil) ⇒ Chroma

Parameters:

  • host (String)
  • port (Integer)
  • collection (String)

    collection name in Chroma. This is a Chroma-specific identifier, so it lives here rather than on VectorDb::Extension (where it’d be a no-op for Backend::InMemory).

  • tenant (String) (defaults to: 'default_tenant')

    Chroma v2 tenant; defaults to Chroma’s own default.

  • database (String) (defaults to: 'default_database')

    Chroma v2 database; defaults to Chroma’s own default.

  • connection (Faraday::Connection, nil) (defaults to: nil)

    optional dependency-injection point for tests.

Raises:

  • (ArgumentError)

    on empty host or empty collection.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 150

def initialize(host:, port:, collection:,
               tenant: 'default_tenant',
               database: 'default_database',
               connection: nil)
  raise ArgumentError, 'host must be non-empty' if host.nil? || host.to_s.empty?
  raise ArgumentError, 'collection must be non-empty' if collection.nil? || collection.to_s.empty?

  @host = host
  @port = port
  @collection_name = collection
  @tenant = tenant
  @database = database
  @collection_id = nil
  @connection = connection || Faraday.new(url: "http://#{host}:#{port}") do |f|
    f.request :json
    f.response :json
    f.adapter Faraday.default_adapter
  end
end

Instance Method Details

#countInteger

Returns current chunk count. Zero before the first #upsert.

Returns:

  • (Integer)

    current chunk count. Zero before the first #upsert.



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 279

def count
  return 0 if @collection_id.nil? && !collection_exists?

  response = @connection.get("#{collection_path}/count")
  unless response.status == 200
    raise "Backend::Chroma: GET #{collection_path}/count returned " \
          "HTTP #{response.status}: #{response.body.inspect}"
  end

  body = response.body
  # Chroma v2 returns the count as a bare integer.
  return body if body.is_a?(Integer)
  return body['count'] if body.is_a?(Hash) && body['count'].is_a?(Integer)

  raise "Backend::Chroma: count response was not an Integer (got #{body.inspect})"
end

#delete_allvoid

This method returns an undefined value.

Drop the collection. Next #upsert re-creates from scratch — that’s the v1 nuke-and-reload reindex path the Indexer drives. No-op if no collection was ever created (consistent with InMemory‘s clear-on-empty behaviour). 404 on the DELETE is treated as “already gone” — idempotent.



265
266
267
268
269
270
271
272
273
274
275
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 265

def delete_all
  return nil if @collection_id.nil? && !collection_exists?

  response = @connection.delete(collection_path)
  unless [200, 204, 404].include?(response.status)
    raise "Backend::Chroma: DELETE #{collection_path} returned " \
          "HTTP #{response.status}: #{response.body.inspect}"
  end
  @collection_id = nil
  nil
end

#delete_by_source(source) ⇒ void

This method returns an undefined value.

Remove every chunk whose source matches, via a metadata-filtered POST …/delete (source is the reserved metadata key #upsert writes). The scoped counterpart to #delete_all. No-op when the collection doesn’t exist yet.

Parameters:

Raises:

  • (RuntimeError)

    on HTTP failure.



305
306
307
308
309
310
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 305

def delete_by_source(source)
  return nil if @collection_id.nil? && !collection_exists?

  post_json("#{collection_path}/delete", { where: { 'source' => source } })
  nil
end

#query(vector:, top_k:) ⇒ Array<Backend::Result>

k-NN query by cosine similarity. Returns at most top_k Results descending by score. score is 1 - cosine_distance so the value matches InMemory‘s cosine-similarity scale.

Parameters:

  • vector (Array<Float>)
  • top_k (Integer)

Returns:

Raises:

  • (ArgumentError)

    on non-positive top_k.

  • (RuntimeError)

    on HTTP failure.



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 221

def query(vector:, top_k:)
  raise ArgumentError, "top_k must be positive (got #{top_k})" if top_k <= 0

  # If we've never upserted, the collection doesn't
  # exist yet — semantic answer is "no hits."
  return [] if @collection_id.nil? && !collection_exists?

  response_body = post_json("#{collection_path}/query", {
                              query_embeddings: [vector],
                              n_results: top_k,
                              include: %w[documents metadatas distances]
                            })

  ids = (response_body['ids']       || [[]]).first || []
  docs = (response_body['documents'] || [[]]).first || []
  metas = (response_body['metadatas'] || [[]]).first || []
  dists = (response_body['distances'] || [[]]).first || []

  ids.each_with_index.map do |id, i|
    meta = metas[i] || {}
    # Pull +source+ back out of the metadata blob;
    # symbolize the remaining keys for round-trip
    # consistency with InMemory.
    source = meta['source'] || ''
    chunk_meta = {}
    meta.each do |k, v|
      next if k == 'source'

      chunk_meta[k.to_sym] = v
    end

    chunk = Chunk.new(id: id, source: source, text: docs[i] || '', metadata: chunk_meta)
    Result.new(chunk: chunk, score: 1.0 - dists[i].to_f)
  end
end

#replace_source(source:, chunks:, vectors:) ⇒ void

This method returns an undefined value.

Replace all chunks for one source: delete the old set, then upsert the new one. The incremental-reindex unit (see Indexer#reindex_file!).

Not transactional (the InMemory divergence)

These are two HTTP calls, so a #query landing between them can see the source with zero chunks — a window InMemory#replace_source closes with its monitor but Chroma cannot, short of server-side transactions it doesn’t expose. The window is small and the Indexer mitigates the common failure: it embeds before calling here, so an embedder outage never reaches this method and the old chunks stay put. Delete-then-upsert (not the reverse): upserting first then deleting by source would delete the just-written chunks.

Parameters:

  • source (String)

    the Chunk#source being replaced.

  • chunks (Array<Chunk>)

    the new chunk set.

  • vectors (Array<Array<Float>>)

    parallel to chunks.

Raises:

  • (ArgumentError)

    on empty input or length mismatch.

  • (RuntimeError)

    on HTTP failure.



335
336
337
338
339
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 335

def replace_source(source:, chunks:, vectors:)
  delete_by_source(source)
  upsert(chunks: chunks, vectors: vectors)
  nil
end

#source_indexed?(source) ⇒ Boolean

Is source in the corpus? Scoped existence check for Tools::Read‘s membership gate: a where-filtered /get capped at one row, include: [] so the response carries only ids — O(1) transport regardless of corpus size, never the full #sources_with_hashes manifest. See the Backend protocol yardoc.

Parameters:

Returns:

  • (Boolean)

    true if at least one chunk has this source.

Raises:

  • (RuntimeError)

    on HTTP failure.



399
400
401
402
403
404
405
406
407
408
409
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 399

def source_indexed?(source)
  return false if @collection_id.nil? && !collection_exists?

  body = post_json("#{collection_path}/get", {
                     where: { 'source' => source },
                     include: [],
                     limit: 1
                   })
  ids = body.is_a?(Hash) ? (body['ids'] || []) : []
  !ids.empty?
end

#sources_with_hashesHash{String => String, nil}

The boot-sweep reference: source → stored content hash for every indexed document. Reads one metadata row per file, not per chunk, via three Chroma /get knobs:

  • where: { offset: 0 } — every file has exactly one chunk at offset 0, so this returns one row per source.

  • include: [‘metadatas’] — drops the heavy embeddings and documents from the response; we pull only the metadata projection, never the vectors.

  • limit / offset — page the read in MANIFEST_PAGE_SIZE chunks so a large corpus never materializes one multi-MB response. (Two unrelated offsets collide in the wording: the where offset is a chunk metadata field; the top-level offset is the pagination cursor — different namespaces in the API.)

Pagination assumes the manifest isn’t mutating mid-read; the Watcher drives this from its single worker thread, so no reindex runs concurrently with the boot sweep that calls it.

Returns:

  • (Hash{String => String, nil})

    source → content hash. Empty when the collection doesn’t exist yet.

Raises:

  • (RuntimeError)

    on HTTP failure.



364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 364

def sources_with_hashes
  return {} if @collection_id.nil? && !collection_exists?

  result = {}
  cursor = 0
  loop do
    body = post_json("#{collection_path}/get", {
                       where: { 'offset' => 0 },
                       include: ['metadatas'],
                       limit: MANIFEST_PAGE_SIZE,
                       offset: cursor
                     })
    metas = body.is_a?(Hash) ? (body['metadatas'] || []) : []
    metas.each do |meta|
      next unless meta.is_a?(Hash) && meta['source']

      result[meta['source']] = meta['hash']
    end
    break if metas.size < MANIFEST_PAGE_SIZE

    cursor += metas.size
  end
  result
end

#upsert(chunks:, vectors:) ⇒ void

This method returns an undefined value.

Insert-or-replace by chunk.id. Parallel arrays of equal length; raises on empty input or length mismatch (same contract as InMemory). Chroma server enforces vector-dim consistency; mismatched dims surface as RuntimeError from a 4xx response (the InMemory backend raises ArgumentError for the same case —documented divergence).

Parameters:

  • chunks (Array<Chunk>)
  • vectors (Array<Array<Float>>)

Raises:

  • (ArgumentError)

    on empty input or length mismatch.

  • (RuntimeError)

    on HTTP failure.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/pikuri/vector_db/backend/chroma.rb', line 183

def upsert(chunks:, vectors:)
  raise ArgumentError, 'upsert called with empty chunks/vectors' if chunks.empty?
  if chunks.size != vectors.size
    raise ArgumentError, "size mismatch: #{chunks.size} chunks vs #{vectors.size} vectors"
  end

  ensure_collection!

  metadatas = chunks.map do |c|
    # Serialize +source+ as a reserved key in Chroma's
    # +metadata+; merge in the user's metadata Hash with
    # keys stringified for JSON round-trip stability.
    base = { 'source' => c.source }
    c..each { |k, v| base[k.to_s] = v }
    base
  end

  body = {
    ids: chunks.map(&:id),
    embeddings: vectors,
    documents: chunks.map(&:text),
    metadatas: metadatas
  }

  post_json("#{collection_path}/upsert", body)
  nil
end