Class: Rubino::Session::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/session/store.rb

Overview

Persists and queries messages within a session.

Ordering note: created_at is iso8601 with second precision, so multiple messages can share the same timestamp. Read/delete paths that need a strict total order break ties on the SQLite ‘rowid` column.

#last_for_role is the entry point used by retry/undo to find the last user (or assistant) turn before rewinding history.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil) ⇒ Store

Returns a new instance of Store.



16
17
18
# File 'lib/rubino/session/store.rb', line 16

def initialize(db: nil)
  @db = db || Rubino.database.db
end

Instance Method Details

#append(message) ⇒ Object

Appends a message to a session

Raises:



21
22
23
24
25
26
# File 'lib/rubino/session/store.rb', line 21

def append(message)
  raise SessionError, "Invalid message" unless message.valid?

  @db[:messages].insert(message.to_row)
  message
end

#copy_into(target_session_id, messages) ⇒ Object

Copies messages into another session preserving ALL wire-significant fields. Assistant tool calls live in metadata (not tool_call_id), so dropping metadata orphans the toolUse block and 400s strict providers (Anthropic/Bedrock) on resume. token_count is copied too so the target session’s budget accounting stays accurate.



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rubino/session/store.rb', line 44

def copy_into(target_session_id, messages)
  messages.each do |msg|
    create(
      session_id: target_session_id,
      role: msg.role,
      content: msg.content,
      tool_name: msg.tool_name,
      tool_call_id: msg.tool_call_id,
      token_count: msg.token_count,
      metadata: msg.
    )
  end
end

#count(session_id) ⇒ Object

Returns total message count for a session



155
156
157
# File 'lib/rubino/session/store.rb', line 155

def count(session_id)
  @db[:messages].where(session_id: session_id).count
end

#count_by_role(session_id) ⇒ Object

Message count for a session broken down by role (#382), so ‘sessions show` can report the REAL cumulative count and label how many rows are tool messages — the cached sessions.message_count column only tracks top-level turns and hides every assistant(tool_use)/tool(result) row. Returns a Hash role => count.



164
165
166
167
168
169
# File 'lib/rubino/session/store.rb', line 164

def count_by_role(session_id)
  @db[:messages]
    .where(session_id: session_id)
    .group_and_count(:role)
    .to_hash(:role, :count)
end

#create(session_id:, role:, content:, **attrs) ⇒ Object

Creates and appends a message from attributes



29
30
31
32
33
34
35
36
37
# File 'lib/rubino/session/store.rb', line 29

def create(session_id:, role:, content:, **attrs)
  message = Message.new(
    session_id: session_id,
    role: role,
    content: content,
    **attrs
  )
  append(message)
end

#delete_from_inclusive(session_id, from_id:) ⇒ Integer

Deletes the given message and every message inserted after it. Used by undo/retry/rewind to rewind history.

Cuts on the monotonic ‘rowid` (insertion order), the same total order #since now uses, so the rewind point is unambiguous even if later messages carry an earlier `created_at` than from_id (clock skew).

After the cut, the memory-extraction watermark is CLAMPED (MEM-1, R1-M2): the cursor message may itself have just been deleted, leaving a dangling watermark that made the next extraction re-mine the whole remaining session — which could resurrect a fact the user just ‘forget`-ed. We repair it WITHOUT moving it forward, so a surviving but not-yet-mined message between the old cursor and the cut is never sealed/lost (see #reseed_extraction_cursor_clamped).

Parameters:

  • session_id (String)
  • from_id (String)

    id of the first message to delete

Returns:

  • (Integer)

    number of rows removed



196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/rubino/session/store.rb', line 196

def delete_from_inclusive(session_id, from_id:)
  from_rowid = @db[:messages]
               .where(id: from_id, session_id: session_id)
               .get(Sequel.lit("rowid"))
  return 0 unless from_rowid

  removed = @db[:messages]
            .where(session_id: session_id)
            .where(Sequel.lit("rowid >= ?", from_rowid))
            .delete
  reseed_extraction_cursor_clamped(session_id)
  removed
end

#for_session(session_id, limit: nil) ⇒ Object

Returns all messages for a session in chronological order. created_at is second-precision, so we tie-break on rowid — without this, an assistant preamble and a tool_result persisted in the same second can come back swapped, which makes the resumed transcript look like the tool fired before the model’s preamble (or worse, like an empty assistant box wrapping the tool).



64
65
66
67
68
69
70
# File 'lib/rubino/session/store.rb', line 64

def for_session(session_id, limit: nil)
  dataset = @db[:messages]
            .where(session_id: session_id)
            .order(:created_at, Sequel.lit("rowid"))
  dataset = dataset.limit(limit) if limit
  dataset.all.map { |row| hydrate(row) }
end

#last_for_role(session_id, role) ⇒ Object

Returns the most recent message for ‘role` (e.g. “user”, “assistant”). Tie-broken on rowid like the other read paths. Used by retry/undo.



254
255
256
257
258
259
260
# File 'lib/rubino/session/store.rb', line 254

def last_for_role(session_id, role)
  row = @db[:messages]
        .where(session_id: session_id, role: role)
        .order(Sequel.desc(:created_at), Sequel.desc(Sequel.lit("rowid")))
        .first
  row && hydrate(row)
end

#last_id(session_id) ⇒ Object

The id of the newest message in a session (by insertion ‘rowid`), or nil for an empty session. Used to advance/seed the memory-extraction cursor —rowid (not created_at) so the watermark tracks insertion order and a backdated tail message still becomes the new cursor.



99
100
101
102
103
104
# File 'lib/rubino/session/store.rb', line 99

def last_id(session_id)
  @db[:messages]
    .where(session_id: session_id)
    .order(Sequel.desc(Sequel.lit("rowid")))
    .get(:id)
end

#reseed_extraction_cursor_clamped(session_id) ⇒ Object

Repair the memory-extraction watermark after a DELETE (undo/retry rewind) without ever sealing an un-mined survivor — the cursor only ever moves BACKWARD here, never forward (MEM-1, R1-M2).

The cursor means “every message up to and including this rowid has been mined”. A delete can leave it dangling (the cursor message itself was cut). Naively re-seeding to the new tail would jump the watermark PAST any surviving message that sat between the old cursor and the cut point — those were never extracted, and sealing them silently drops their facts.

So we clamp: the new cursor is the newest SURVIVING message whose rowid is <= the old cursor’s rowid (the last position we KNOW was mined).

* old cursor still survives  -> unchanged (later survivors stay un-mined
  and get extracted next turn);
* old cursor was deleted     -> falls back to the newest survivor at-or-
  before it (every survivor predates the cut, so this is the new tail);
* old cursor was nil         -> stays nil (never-extracted: re-mine all).

No-op when the session row is absent. Returns the (possibly unchanged) id.



145
146
147
148
149
150
151
152
# File 'lib/rubino/session/store.rb', line 145

def reseed_extraction_cursor_clamped(session_id)
  return nil unless @db[:sessions].where(id: session_id).any?

  old_cursor = @db[:sessions].where(id: session_id).get(:memory_extracted_msg_id)
  clamped = newest_surviving_at_or_before(session_id, old_cursor)
  @db[:sessions].where(id: session_id).update(memory_extracted_msg_id: clamped)
  clamped
end

#search(query:, since: nil, until_: nil, role: nil, tool: nil, limit: 20) ⇒ Array<Hash>

Full-text search across messages backed by the ‘messages_fts` FTS5 virtual table (see migration 007). Returns hydrated rows with an FTS5 snippet() highlighting the match. Filters compose on top of the FTS MATCH so the index does the heavy lifting and SQL prunes the rest.

Parameters:

  • query (String)

    FTS5 MATCH expression; sanitized via Quoting

  • since (String, nil) (defaults to: nil)

    iso8601 lower bound on created_at

  • until_ (String, nil) (defaults to: nil)

    iso8601 upper bound on created_at

  • role (String, nil) (defaults to: nil)

    restrict to a specific message role

  • tool (String, nil) (defaults to: nil)

    restrict to a specific tool_name

  • limit (Integer) (defaults to: 20)

    cap on rows returned (max 100)

Returns:

  • (Array<Hash>)

    rows: session_id, run_id (nil — not tracked on messages), message_id, role, snippet, created_at



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/rubino/session/store.rb', line 223

def search(query:, since: nil, until_: nil, role: nil, tool: nil, limit: 20)
  return [] if query.nil? || query.to_s.strip.empty?

  limit = limit.to_i.clamp(1, 100)
  match_query = sanitize_fts_query(query)

  dataset = @db[:messages_fts]
            .where(Sequel.lit("messages_fts MATCH ?", match_query))
            .join(:messages, Sequel[:messages][:rowid] => Sequel[:messages_fts][:rowid])
            .select(
              Sequel[:messages][:id].as(:message_id),
              Sequel[:messages][:session_id],
              Sequel[:messages][:role],
              Sequel[:messages][:created_at],
              Sequel.lit("snippet(messages_fts, 0, '<mark>', '</mark>', '...', 16) AS snippet")
            )

  dataset = dataset.where(Sequel[:messages][:role] => role) if role
  dataset = dataset.where(Sequel[:messages][:tool_name] => tool) if tool
  dataset = dataset.where(Sequel.lit("messages.created_at >= ?", since)) if since
  dataset = dataset.where(Sequel.lit("messages.created_at <= ?", until_)) if until_

  dataset
    .order(Sequel.desc(Sequel[:messages][:created_at]), Sequel.desc(Sequel.lit("messages.rowid")))
    .limit(limit)
    .all
    .map { |row| row.merge(run_id: nil) }
end

#seed_extraction_cursor(session_id) ⇒ Object

Seed/reset this session’s memory-extraction watermark to its current last message (by rowid) so the extractor’s next turn feeds only what is added AFTER this point — not the whole transcript.

Used by fork/branch/compaction, which copy a FULLY-MINED transcript into a fresh child whose cursor starts NULL — without seeding, the child would re-mine the ENTIRE copied transcript on its first turn (MEM-2). The caller MUST have flushed/extracted the source up to its tail first (compaction flushes before copy; #branch_runner now does too), so every copied message is already mined and sealing the cursor at the tail loses nothing.

Sets the cursor to nil for an empty session (the never-extracted state). No-op when the session row is absent. Returns the new cursor id (or nil).



119
120
121
122
123
124
125
# File 'lib/rubino/session/store.rb', line 119

def seed_extraction_cursor(session_id)
  return nil unless @db[:sessions].where(id: session_id).any?

  new_cursor = last_id(session_id)
  @db[:sessions].where(id: session_id).update(memory_extracted_msg_id: new_cursor)
  new_cursor
end

#since(session_id, after_id:) ⇒ Object

Returns messages strictly NEWER than after_id, in INSERTION order. Used by the memory extractor’s per-session cursor (#249): feeding only the messages a turn actually added, instead of an overlapping recency window.

Ordering is on the monotonic ‘rowid` — NOT the wall-clock `created_at` —so a message whose `created_at` regresses (backward clock step, NTP correction, VM suspend) is still seen as “new” and never silently skipped (MEM-3): its rowid is strictly greater than the cursor’s even when its timestamp is smaller. rowid is SQLite’s append-only insertion counter, exactly the “what arrived after the watermark” semantics the cursor wants. A nil/unknown after_id (never-extracted session) returns the whole session in order.



84
85
86
87
88
89
90
91
92
93
# File 'lib/rubino/session/store.rb', line 84

def since(session_id, after_id:)
  cursor_rowid = after_id && @db[:messages]
                 .where(id: after_id, session_id: session_id)
                 .get(Sequel.lit("rowid"))
  ds = @db[:messages]
       .where(session_id: session_id)
       .order(Sequel.lit("rowid"))
  ds = ds.where(Sequel.lit("rowid > ?", cursor_rowid)) if cursor_rowid
  ds.all.map { |row| hydrate(row) }
end

#token_sum(session_id) ⇒ Object

Returns estimated token sum for a session



172
173
174
175
176
# File 'lib/rubino/session/store.rb', line 172

def token_sum(session_id)
  @db[:messages]
    .where(session_id: session_id)
    .sum(:token_count) || 0
end