Class: Pikuri::Memory::Recorder
- Inherits:
-
Object
- Object
- Pikuri::Memory::Recorder
- Defined in:
- lib/pikuri/memory/recorder.rb
Overview
The off-the-interaction-path capture queue. A single background worker thread drains enqueued user turns into Mem0Client#add, so a turn never blocks on the ~3s extraction round-trip (DESIGN.md §“Why a non-reasoning extraction model”). Reads are cheap and synchronous; writes are deferred here — the read/write asymmetry the design leans on.
Why a thread, not a fork or an external job
mem0’s add is one HTTP call. A thread is the smallest thing that takes it off the turn’s critical path while keeping capture per-turn (so a kill -9 / closed tab loses at most the in-flight turn, not a whole session — the data-safety win that picked a separate extraction model in DESIGN.md §“Extraction model decision”). No external queue, no persistence — durability is “within seconds, in mem0,” not “survives a crash mid-extraction.”
Bounded flush on close
#close signals the worker to stop and joins it with a timeout —so “quit” never hangs for minutes while a backlog digests (DESIGN.md §“Capture: off-path and bounded”). Anything still queued past the timeout is dropped: the worker is a plain Ruby thread, so process exit reaps it. One enqueue per turn means the queue rarely holds more than a single item, so the common case flushes in one add.
Failure is logged, not raised
A Mem0Client#add that raises (mem0 down, timeout, 5xx) is caught, logged at WARN, and the item dropped — a transient capture failure must never crash the worker (which would silently end all future capture) nor surface to the user mid-conversation.
Constant Summary collapse
- LOGGER =
Pikuri.logger_for('Memory::Recorder')
- STOP =
Returns sentinel pushed by #close to stop the worker after it drains everything enqueued ahead of it.
:__pikuri_memory_stop__- DEFAULT_FLUSH_TIMEOUT =
Returns default seconds #close waits for the worker to drain before giving up and letting process exit reap it.
10
Instance Method Summary collapse
-
#close ⇒ void
Stop the worker and flush the backlog, bounded by
flush_timeout. -
#enqueue(content) ⇒ void
Enqueue one user turn for asynchronous extraction.
- #initialize(client:, user_id:, infer: true, prompt: nil, flush_timeout: DEFAULT_FLUSH_TIMEOUT) ⇒ Recorder constructor
-
#start ⇒ self
Start the background worker.
Constructor Details
#initialize(client:, user_id:, infer: true, prompt: nil, flush_timeout: DEFAULT_FLUSH_TIMEOUT) ⇒ Recorder
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/pikuri/memory/recorder.rb', line 58 def initialize(client:, user_id:, infer: true, prompt: nil, flush_timeout: DEFAULT_FLUSH_TIMEOUT) @client = client @user_id = user_id @infer = infer @prompt = prompt @flush_timeout = flush_timeout @queue = Thread::Queue.new @thread = nil @closed = false end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Stop the worker and flush the backlog, bounded by flush_timeout. Pushes STOP (so items already queued drain first), then joins the worker for at most the timeout. Idempotent. If the timeout elapses with work still pending, the remaining items are abandoned to process-exit reaping — a deliberate bound so quit never hangs.
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/pikuri/memory/recorder.rb', line 101 def close return if @closed @closed = true @queue << STOP return unless @thread return if @thread.join(@flush_timeout) LOGGER.warn("flush did not finish within #{@flush_timeout}s; " \ "#{@queue.size} memory write(s) abandoned at exit") nil end |
#enqueue(content) ⇒ void
This method returns an undefined value.
Enqueue one user turn for asynchronous extraction. Non-blocking (a bounded push to an in-memory queue). Blank content and post-close enqueues are silently ignored.
85 86 87 88 89 90 91 |
# File 'lib/pikuri/memory/recorder.rb', line 85 def enqueue(content) return if @closed return if content.nil? || content.to_s.strip.empty? @queue << content nil end |
#start ⇒ self
Start the background worker. Idempotent — a second call is a no-op, so Extension#bind can call it without guarding.
74 75 76 77 |
# File 'lib/pikuri/memory/recorder.rb', line 74 def start @thread ||= Thread.new { run_loop } self end |