Class: Pikuri::VectorDb::Watcher
- Inherits:
-
Object
- Object
- Pikuri::VectorDb::Watcher
- Defined in:
- lib/pikuri/vector_db/watcher.rb
Overview
The auto-watch daemon: keep a persistent index in sync with a corpus on disk, without the user ever calling vectordb_reindex. A thin layer over the listen gem that drives the Indexer‘s per-file entry points (Indexer#reindex_file! / Indexer#remove_file!) from filesystem events plus a boot reconciliation sweep (Indexer#reconcile_plan). The auto-watch half of the incremental-reindex + auto-watch feature; the per-file methods it drives live on Indexer and Backend.
Host-owned, not agent-owned
A web app has many Agents but one corpus and one Watcher, so the Watcher’s lifetime is the host process, not any single agent. The host constructs it once, around the same Indexer (and therefore the same Backend) every agent’s Extension shares, and #starts / #closes it on process lifecycle:
server = Pikuri::VectorDb::Server::Chroma.ensure_running
backend = server.client(collection: 'notes')
ext = Pikuri::VectorDb::Extension.new(backend: backend, source: '~/notes')
# ... build agents with `ext` ...
watcher = Pikuri::VectorDb::Watcher.new(indexer: ext.indexer).start
The Extension indexes nothing on its own, so the Watcher is the population path here: its boot reconcile sweep (Indexer#reconcile_plan) fills an empty backend and is uniform across backends. A host that also indexes some other way (an explicit ext.indexer.index_if_empty! before start) is fine too — the sweep then finds matching hashes and reindexes nothing.
Single work queue, last-intent-per-path
The boot sweep and the live event stream must not race. They funnel into one @pending map (path → :reindex/:remove) drained by one worker thread, so:
-
Rapid edits to one file collapse to a single reindex (the map holds one intent per path);
listen‘slatencywindow coalesces most of it before it even reaches us. -
Order doesn’t corrupt content: #process resolves the file from disk *at process time*, and Indexer#reindex_file! of a vanished file is itself a removal — so a stale
:reindexthat runs after a delete self-corrects rather than writing old bytes.
The boot sweep is enqueued by the worker thread after the listener is already subscribed, so a change made mid-sweep lands in the same queue rather than being lost.
Teardown drains the in-flight job
The Watcher registers with Finalizers last (after the host’s Server::Chroma), so LIFO closes it first — before the container is docker rm -f‘d. #close stops the listener (no new events), then joins the worker, which finishes the job it is mid-flight on and exits. That ordering is load-bearing: it guarantees no Backend#replace_source is still writing when the Chroma container goes down. Remaining queued (not-yet-started) jobs are abandoned — the next boot’s reconcile sweep catches them (the downtime-gap guarantee).
What it deliberately does not do
-
**No model/parameter awareness.** The change signal is the file’s byte hash (Indexer#file_hash), so swapping the embedder or chunker config does not trigger reindex — the recourse is a manual full
vectordb_reindex. See Extension for the loud warning. -
**Symlinks are never indexed.** A live event whose path is reached through a symlink below the watch root is dropped —the leaf may be a symlinked file, or an ancestor a symlinked directory. This matches Indexer#list_files, which skips the same paths on the boot walk, so the watch and the walk index exactly the same set. The rule is strict on purpose, not OS-default: it is a privacy boundary (a link pointing out of the corpus never gets embedded) and it closes the otherwise off-tree divergence — a symlink target that changes outside the watched tree fires no event, so following it would leave stale chunks the walk would later remove.
-
**No backpressure.** Re-embedding competes with the chat model on a shared llama-server; vectordb targets low-churn corpora (notes, docs), so a burst is tolerated rather than throttled.
Constant Summary collapse
- LOGGER =
Pikuri.logger_for('VectorDb::Watcher')
Instance Method Summary collapse
-
#close ⇒ void
Stop watching and tear down cleanly.
-
#drain! ⇒ void
Process every currently-queued job synchronously and return.
- #initialize(indexer:, latency: 10.0, listen_factory: nil) ⇒ Watcher constructor
-
#process_event(modified, added, removed) ⇒ void
The
listencallback body, also the test seam. -
#start ⇒ self
Subscribe to filesystem events, spawn the worker thread (which runs the boot reconcile sweep then drains events), and register for teardown.
Constructor Details
#initialize(indexer:, latency: 10.0, listen_factory: nil) ⇒ Watcher
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/pikuri/vector_db/watcher.rb', line 111 def initialize(indexer:, latency: 10.0, listen_factory: nil) @indexer = indexer @latency = latency @listen_factory = listen_factory || lambda do |dir, &blk| require 'listen' Listen.to(dir, latency: @latency, &blk) end # Resolve the watch shape once, here, rather than re-stat'ing # per event. {Indexer#root} is the source for a directory tree # and the *parent* for a single file, so +source != watch_dir+ # distinguishes the two without touching the filesystem — and # stays correct even if the single watched file is later # deleted (a +source.file?+ check would flip to directory mode # and start indexing its siblings). @source = @indexer.source. @watch_dir = @indexer.root. @single_file = @source != @watch_dir @pending = {} # path (String) → :reindex / :remove @monitor = Monitor.new @cond = @monitor.new_cond @closing = false @started = false @closed = false @worker = nil @listener = nil @finalizer_handle = nil end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Stop watching and tear down cleanly. Stops the listener so no new events arrive, then joins the worker — which finishes its in-flight job and exits, abandoning any still-queued work. Idempotent and safe at process exit (the host may have called it already, or only the at_exit finalizer fires).
165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/pikuri/vector_db/watcher.rb', line 165 def close return if @closed @closed = true @listener&.stop @monitor.synchronize do @closing = true @cond.broadcast end @worker&.join Pikuri::Finalizers.unregister(@finalizer_handle) if @finalizer_handle nil end |
#drain! ⇒ void
This method returns an undefined value.
Process every currently-queued job synchronously and return. The worker thread calls this implicitly via its blocking loop; tests call it directly to drain without a thread. Does not wait for new work and does not consult the closing flag.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/pikuri/vector_db/watcher.rb', line 212 def drain! loop do job = @monitor.synchronize do next nil if @pending.empty? key = @pending.keys.first [key, @pending.delete(key)] end break if job.nil? process(*job) end nil end |
#process_event(modified, added, removed) ⇒ void
This method returns an undefined value.
The listen callback body, also the test seam. Translates a batch of changed paths into queued work: modified/added →:reindex, removed → :remove. Non-indexable paths (denylisted dirs, dot-files, anything outside the watch root) are dropped, as are paths reached through a symlink (see the class doc — strict, to agree with Indexer#list_files). The symlink check guards only :reindex: it lstats the path, which a removed path no longer has, and a :remove for a never-indexed symlink source is a harmless no-op anyway.
193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/pikuri/vector_db/watcher.rb', line 193 def process_event(modified, added, removed) (Array(modified) + Array(added)).each do |path| next unless indexable?(path) next if symlinked_below_root?(path) enqueue(:reindex, path) end Array(removed).each do |path| enqueue(:remove, path) if indexable?(path) end nil end |
#start ⇒ self
Subscribe to filesystem events, spawn the worker thread (which runs the boot reconcile sweep then drains events), and register for teardown. Idempotent guard against a double start.
146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/pikuri/vector_db/watcher.rb', line 146 def start raise 'Watcher already started' if @started @started = true subscribe @worker = Thread.new { work_loop } # Register LAST so LIFO teardown closes the Watcher before the # Server::Chroma the host registered earlier — see the class yardoc. @finalizer_handle = Pikuri::Finalizers.register(self) self end |