Class: Pikuri::VectorDb::Watcher

Inherits:
Object
  • Object
show all
Defined in:
lib/pikuri/vector_db/watcher.rb

Overview

The auto-watch daemon: keep a persistent index in sync with a corpus on disk, without the user ever calling vectordb_reindex. A thin layer over the listen gem that drives the Indexer‘s per-file entry points (Indexer#reindex_file! / Indexer#remove_file!) from filesystem events plus a boot reconciliation sweep (Indexer#reconcile_plan). The auto-watch half of the incremental-reindex + auto-watch feature; the per-file methods it drives live on Indexer and Backend.

Host-owned, not agent-owned

A web app has many Agents but one corpus and one Watcher, so the Watcher’s lifetime is the host process, not any single agent. The host constructs it once, around the same Indexer (and therefore the same Backend) every agent’s Extension shares, and #starts / #closes it on process lifecycle:

server = Pikuri::VectorDb::Server::Chroma.ensure_running
backend = server.client(collection: 'notes')
ext = Pikuri::VectorDb::Extension.new(backend: backend, source: '~/notes')
# ... build agents with `ext` ...
watcher = Pikuri::VectorDb::Watcher.new(indexer: ext.indexer).start

The Extension indexes nothing on its own, so the Watcher is the population path here: its boot reconcile sweep (Indexer#reconcile_plan) fills an empty backend and is uniform across backends. A host that also indexes some other way (an explicit ext.indexer.index_if_empty! before start) is fine too — the sweep then finds matching hashes and reindexes nothing.

Single work queue, last-intent-per-path

The boot sweep and the live event stream must not race. They funnel into one @pending map (path → :reindex/:remove) drained by one worker thread, so:

  • Rapid edits to one file collapse to a single reindex (the map holds one intent per path); listen‘s latency window coalesces most of it before it even reaches us.

  • Order doesn’t corrupt content: #process resolves the file from disk *at process time*, and Indexer#reindex_file! of a vanished file is itself a removal — so a stale :reindex that runs after a delete self-corrects rather than writing old bytes.

The boot sweep is enqueued by the worker thread after the listener is already subscribed, so a change made mid-sweep lands in the same queue rather than being lost.

Teardown drains the in-flight job

The Watcher registers with Finalizers last (after the host’s Server::Chroma), so LIFO closes it first — before the container is docker rm -f‘d. #close stops the listener (no new events), then joins the worker, which finishes the job it is mid-flight on and exits. That ordering is load-bearing: it guarantees no Backend#replace_source is still writing when the Chroma container goes down. Remaining queued (not-yet-started) jobs are abandoned — the next boot’s reconcile sweep catches them (the downtime-gap guarantee).

What it deliberately does not do

  • **No model/parameter awareness.** The change signal is the file’s byte hash (Indexer#file_hash), so swapping the embedder or chunker config does not trigger reindex — the recourse is a manual full vectordb_reindex. See Extension for the loud warning.

  • **Symlinks are never indexed.** A live event whose path is reached through a symlink below the watch root is dropped —the leaf may be a symlinked file, or an ancestor a symlinked directory. This matches Indexer#list_files, which skips the same paths on the boot walk, so the watch and the walk index exactly the same set. The rule is strict on purpose, not OS-default: it is a privacy boundary (a link pointing out of the corpus never gets embedded) and it closes the otherwise off-tree divergence — a symlink target that changes outside the watched tree fires no event, so following it would leave stale chunks the walk would later remove.

  • **No backpressure.** Re-embedding competes with the chat model on a shared llama-server; vectordb targets low-churn corpora (notes, docs), so a burst is tolerated rather than throttled.

Constant Summary collapse

LOGGER =
Pikuri.logger_for('VectorDb::Watcher')

Instance Method Summary collapse

Constructor Details

#initialize(indexer:, latency: 10.0, listen_factory: nil) ⇒ Watcher

Parameters:

  • indexer (Indexer)

    the indexer to drive — supplies the watch Indexer#root, the source-vs-directory shape via Indexer#source, and the per-file + reconcile entry points.

  • latency (Float) (defaults to: 10.0)

    listen debounce window in seconds: how long filesystem events are batched before the callback fires. The corpus is expected to change rarely, so a coarse value (default 10s) coalesces editor save-storms without any cost.

  • listen_factory (#call, nil) (defaults to: nil)

    dependency-injection seam for tests: call(dir) { |modified, added, removed| … } returns an object responding to #start / #stop. nil uses the real Listen.to (required lazily — see the gemspec note).



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/pikuri/vector_db/watcher.rb', line 111

def initialize(indexer:, latency: 10.0, listen_factory: nil)
  @indexer = indexer
  @latency = latency
  @listen_factory = listen_factory || lambda do |dir, &blk|
    require 'listen'
    Listen.to(dir, latency: @latency, &blk)
  end

  # Resolve the watch shape once, here, rather than re-stat'ing
  # per event. {Indexer#root} is the source for a directory tree
  # and the *parent* for a single file, so +source != watch_dir+
  # distinguishes the two without touching the filesystem — and
  # stays correct even if the single watched file is later
  # deleted (a +source.file?+ check would flip to directory mode
  # and start indexing its siblings).
  @source    = @indexer.source.expand_path
  @watch_dir = @indexer.root.expand_path
  @single_file = @source != @watch_dir

  @pending  = {} # path (String) → :reindex / :remove
  @monitor  = Monitor.new
  @cond     = @monitor.new_cond
  @closing  = false
  @started  = false
  @closed   = false
  @worker   = nil
  @listener = nil
  @finalizer_handle = nil
end

Instance Method Details

#closevoid

This method returns an undefined value.

Stop watching and tear down cleanly. Stops the listener so no new events arrive, then joins the worker — which finishes its in-flight job and exits, abandoning any still-queued work. Idempotent and safe at process exit (the host may have called it already, or only the at_exit finalizer fires).



165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/pikuri/vector_db/watcher.rb', line 165

def close
  return if @closed

  @closed = true
  @listener&.stop
  @monitor.synchronize do
    @closing = true
    @cond.broadcast
  end
  @worker&.join
  Pikuri::Finalizers.unregister(@finalizer_handle) if @finalizer_handle
  nil
end

#drain!void

This method returns an undefined value.

Process every currently-queued job synchronously and return. The worker thread calls this implicitly via its blocking loop; tests call it directly to drain without a thread. Does not wait for new work and does not consult the closing flag.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/pikuri/vector_db/watcher.rb', line 212

def drain!
  loop do
    job = @monitor.synchronize do
      next nil if @pending.empty?

      key = @pending.keys.first
      [key, @pending.delete(key)]
    end
    break if job.nil?

    process(*job)
  end
  nil
end

#process_event(modified, added, removed) ⇒ void

This method returns an undefined value.

The listen callback body, also the test seam. Translates a batch of changed paths into queued work: modified/added →:reindex, removed → :remove. Non-indexable paths (denylisted dirs, dot-files, anything outside the watch root) are dropped, as are paths reached through a symlink (see the class doc — strict, to agree with Indexer#list_files). The symlink check guards only :reindex: it lstats the path, which a removed path no longer has, and a :remove for a never-indexed symlink source is a harmless no-op anyway.

Parameters:

  • modified (Array<String>)

    absolute paths.

  • added (Array<String>)

    absolute paths.

  • removed (Array<String>)

    absolute paths.



193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/pikuri/vector_db/watcher.rb', line 193

def process_event(modified, added, removed)
  (Array(modified) + Array(added)).each do |path|
    next unless indexable?(path)
    next if symlinked_below_root?(path)

    enqueue(:reindex, path)
  end
  Array(removed).each do |path|
    enqueue(:remove, path) if indexable?(path)
  end
  nil
end

#startself

Subscribe to filesystem events, spawn the worker thread (which runs the boot reconcile sweep then drains events), and register for teardown. Idempotent guard against a double start.

Returns:

  • (self)


146
147
148
149
150
151
152
153
154
155
156
# File 'lib/pikuri/vector_db/watcher.rb', line 146

def start
  raise 'Watcher already started' if @started

  @started = true
  subscribe
  @worker = Thread.new { work_loop }
  # Register LAST so LIFO teardown closes the Watcher before the
  # Server::Chroma the host registered earlier — see the class yardoc.
  @finalizer_handle = Pikuri::Finalizers.register(self)
  self
end