Class: Pikuri::Memory::Recorder

Inherits:
Object
  • Object
show all
Defined in:
lib/pikuri/memory/recorder.rb

Overview

The off-the-interaction-path capture queue. A single background worker thread drains enqueued user turns into Mem0Client#add, so a turn never blocks on the ~3s extraction round-trip (DESIGN.md §“Why a non-reasoning extraction model”). Reads are cheap and synchronous; writes are deferred here — the read/write asymmetry the design leans on.

Why a thread, not a fork or an external job

mem0’s add is one HTTP call. A thread is the smallest thing that takes it off the turn’s critical path while keeping capture per-turn (so a kill -9 / closed tab loses at most the in-flight turn, not a whole session — the data-safety win that picked a separate extraction model in DESIGN.md §“Extraction model decision”). No external queue, no persistence — durability is “within seconds, in mem0,” not “survives a crash mid-extraction.”

Bounded flush on close

#close signals the worker to stop and joins it with a timeout —so “quit” never hangs for minutes while a backlog digests (DESIGN.md §“Capture: off-path and bounded”). Anything still queued past the timeout is dropped: the worker is a plain Ruby thread, so process exit reaps it. One enqueue per turn means the queue rarely holds more than a single item, so the common case flushes in one add.

Failure is logged, not raised

A Mem0Client#add that raises (mem0 down, timeout, 5xx) is caught, logged at WARN, and the item dropped — a transient capture failure must never crash the worker (which would silently end all future capture) nor surface to the user mid-conversation.

Constant Summary collapse

LOGGER =
Pikuri.logger_for('Memory::Recorder')
STOP =

Returns sentinel pushed by #close to stop the worker after it drains everything enqueued ahead of it.

Returns:

  • (Symbol)

    sentinel pushed by #close to stop the worker after it drains everything enqueued ahead of it.

:__pikuri_memory_stop__
DEFAULT_FLUSH_TIMEOUT =

Returns default seconds #close waits for the worker to drain before giving up and letting process exit reap it.

Returns:

  • (Integer)

    default seconds #close waits for the worker to drain before giving up and letting process exit reap it.

10

Instance Method Summary collapse

Constructor Details

#initialize(client:, user_id:, infer: true, prompt: nil, flush_timeout: DEFAULT_FLUSH_TIMEOUT) ⇒ Recorder

Parameters:

  • client (Mem0Client)

    the mem0 client add is dispatched to.

  • user_id (String)

    the mem0 namespace captures land in.

  • infer (Boolean) (defaults to: true)

    forwarded to Mem0Client#add; true stores extracted facts.

  • prompt (String, nil) (defaults to: nil)

    optional per-request extraction prompt forwarded to Mem0Client#add.

  • flush_timeout (Integer) (defaults to: DEFAULT_FLUSH_TIMEOUT)

    seconds #close blocks waiting for the backlog to drain.



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/pikuri/memory/recorder.rb', line 58

def initialize(client:, user_id:, infer: true, prompt: nil,
               flush_timeout: DEFAULT_FLUSH_TIMEOUT)
  @client = client
  @user_id = user_id
  @infer = infer
  @prompt = prompt
  @flush_timeout = flush_timeout
  @queue = Thread::Queue.new
  @thread = nil
  @closed = false
end

Instance Method Details

#closevoid

This method returns an undefined value.

Stop the worker and flush the backlog, bounded by flush_timeout. Pushes STOP (so items already queued drain first), then joins the worker for at most the timeout. Idempotent. If the timeout elapses with work still pending, the remaining items are abandoned to process-exit reaping — a deliberate bound so quit never hangs.



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/pikuri/memory/recorder.rb', line 101

def close
  return if @closed

  @closed = true
  @queue << STOP
  return unless @thread

  return if @thread.join(@flush_timeout)

  LOGGER.warn("flush did not finish within #{@flush_timeout}s; " \
              "#{@queue.size} memory write(s) abandoned at exit")
  nil
end

#enqueue(content) ⇒ void

This method returns an undefined value.

Enqueue one user turn for asynchronous extraction. Non-blocking (a bounded push to an in-memory queue). Blank content and post-close enqueues are silently ignored.

Parameters:

  • content (String)

    the user’s message to capture.



85
86
87
88
89
90
91
# File 'lib/pikuri/memory/recorder.rb', line 85

def enqueue(content)
  return if @closed
  return if content.nil? || content.to_s.strip.empty?

  @queue << content
  nil
end

#startself

Start the background worker. Idempotent — a second call is a no-op, so Extension#bind can call it without guarding.

Returns:

  • (self)


74
75
76
77
# File 'lib/pikuri/memory/recorder.rb', line 74

def start
  @thread ||= Thread.new { run_loop }
  self
end