Class: Rubino::Session::Store
- Inherits:
-
Object
- Object
- Rubino::Session::Store
- Defined in:
- lib/rubino/session/store.rb
Overview
Persists and queries messages within a session.
Ordering note: created_at is iso8601 with second precision, so multiple messages can share the same timestamp. Read/delete paths that need a strict total order break ties on the SQLite ‘rowid` column.
#last_for_role is the entry point used by retry/undo to find the last user (or assistant) turn before rewinding history.
Instance Method Summary collapse
-
#append(message) ⇒ Object
Appends a message to a session.
-
#copy_into(target_session_id, messages) ⇒ Object
Copies messages into another session preserving ALL wire-significant fields.
-
#count(session_id) ⇒ Object
Returns total message count for a session.
-
#count_by_role(session_id) ⇒ Object
Message count for a session broken down by role (#382), so ‘sessions show` can report the REAL cumulative count and label how many rows are tool messages — the cached sessions.message_count column only tracks top-level turns and hides every assistant(tool_use)/tool(result) row.
-
#create(session_id:, role:, content:, **attrs) ⇒ Object
Creates and appends a message from attributes.
-
#delete_from_inclusive(session_id, from_id:) ⇒ Integer
Deletes the given message and every message inserted after it.
-
#for_session(session_id, limit: nil) ⇒ Object
Returns all messages for a session in chronological order.
-
#initialize(db: nil) ⇒ Store
constructor
A new instance of Store.
-
#last_for_role(session_id, role) ⇒ Object
Returns the most recent message for ‘role` (e.g. “user”, “assistant”).
-
#last_id(session_id) ⇒ Object
The id of the newest message in a session (by insertion ‘rowid`), or nil for an empty session.
-
#recent(session_id, count:) ⇒ Object
Returns the N most recent messages for a session.
-
#reseed_extraction_cursor_clamped(session_id) ⇒ Object
Repair the memory-extraction watermark after a DELETE (undo/retry rewind) without ever sealing an un-mined survivor — the cursor only ever moves BACKWARD here, never forward (MEM-1, R1-M2).
-
#search(query:, since: nil, until_: nil, role: nil, tool: nil, limit: 20) ⇒ Array<Hash>
Full-text search across messages backed by the ‘messages_fts` FTS5 virtual table (see migration 007).
-
#seed_extraction_cursor(session_id) ⇒ Object
Seed/reset this session’s memory-extraction watermark to its current last message (by rowid) so the extractor’s next turn feeds only what is added AFTER this point — not the whole transcript.
-
#since(session_id, after_id:) ⇒ Object
Returns messages strictly NEWER than
after_id, in INSERTION order. -
#token_sum(session_id) ⇒ Object
Returns estimated token sum for a session.
Constructor Details
Instance Method Details
#append(message) ⇒ Object
Appends a message to a session
21 22 23 24 25 26 |
# File 'lib/rubino/session/store.rb', line 21 def append() raise SessionError, "Invalid message" unless .valid? @db[:messages].insert(.to_row) end |
#copy_into(target_session_id, messages) ⇒ Object
Copies messages into another session preserving ALL wire-significant fields. Assistant tool calls live in metadata (not tool_call_id), so dropping metadata orphans the toolUse block and 400s strict providers (Anthropic/Bedrock) on resume. token_count is copied too so the target session’s budget accounting stays accurate.
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rubino/session/store.rb', line 44 def copy_into(target_session_id, ) .each do |msg| create( session_id: target_session_id, role: msg.role, content: msg.content, tool_name: msg.tool_name, tool_call_id: msg.tool_call_id, token_count: msg.token_count, metadata: msg. ) end end |
#count(session_id) ⇒ Object
Returns total message count for a session
166 167 168 |
# File 'lib/rubino/session/store.rb', line 166 def count(session_id) @db[:messages].where(session_id: session_id).count end |
#count_by_role(session_id) ⇒ Object
Message count for a session broken down by role (#382), so ‘sessions show` can report the REAL cumulative count and label how many rows are tool messages — the cached sessions.message_count column only tracks top-level turns and hides every assistant(tool_use)/tool(result) row. Returns a Hash role => count.
175 176 177 178 179 180 |
# File 'lib/rubino/session/store.rb', line 175 def count_by_role(session_id) @db[:messages] .where(session_id: session_id) .group_and_count(:role) .to_hash(:role, :count) end |
#create(session_id:, role:, content:, **attrs) ⇒ Object
Creates and appends a message from attributes
29 30 31 32 33 34 35 36 37 |
# File 'lib/rubino/session/store.rb', line 29 def create(session_id:, role:, content:, **attrs) = Message.new( session_id: session_id, role: role, content: content, **attrs ) append() end |
#delete_from_inclusive(session_id, from_id:) ⇒ Integer
Deletes the given message and every message inserted after it. Used by undo/retry/rewind to rewind history.
Cuts on the monotonic ‘rowid` (insertion order), the same total order #since now uses, so the rewind point is unambiguous even if later messages carry an earlier `created_at` than from_id (clock skew).
After the cut, the memory-extraction watermark is CLAMPED (MEM-1, R1-M2): the cursor message may itself have just been deleted, leaving a dangling watermark that made the next extraction re-mine the whole remaining session — which could resurrect a fact the user just ‘forget`-ed. We repair it WITHOUT moving it forward, so a surviving but not-yet-mined message between the old cursor and the cut is never sealed/lost (see #reseed_extraction_cursor_clamped).
207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/rubino/session/store.rb', line 207 def delete_from_inclusive(session_id, from_id:) from_rowid = @db[:messages] .where(id: from_id, session_id: session_id) .get(Sequel.lit("rowid")) return 0 unless from_rowid removed = @db[:messages] .where(session_id: session_id) .where(Sequel.lit("rowid >= ?", from_rowid)) .delete reseed_extraction_cursor_clamped(session_id) removed end |
#for_session(session_id, limit: nil) ⇒ Object
Returns all messages for a session in chronological order. created_at is second-precision, so we tie-break on rowid — without this, an assistant preamble and a tool_result persisted in the same second can come back swapped, which makes the resumed transcript look like the tool fired before the model’s preamble (or worse, like an empty assistant box wrapping the tool).
64 65 66 67 68 69 70 |
# File 'lib/rubino/session/store.rb', line 64 def for_session(session_id, limit: nil) dataset = @db[:messages] .where(session_id: session_id) .order(:created_at, Sequel.lit("rowid")) dataset = dataset.limit(limit) if limit dataset.all.map { |row| hydrate(row) } end |
#last_for_role(session_id, role) ⇒ Object
Returns the most recent message for ‘role` (e.g. “user”, “assistant”). Tie-broken on rowid like the other read paths. Used by retry/undo.
265 266 267 268 269 270 271 |
# File 'lib/rubino/session/store.rb', line 265 def last_for_role(session_id, role) row = @db[:messages] .where(session_id: session_id, role: role) .order(Sequel.desc(:created_at), Sequel.desc(Sequel.lit("rowid"))) .first row && hydrate(row) end |
#last_id(session_id) ⇒ Object
The id of the newest message in a session (by insertion ‘rowid`), or nil for an empty session. Used to advance/seed the memory-extraction cursor —rowid (not created_at) so the watermark tracks insertion order and a backdated tail message still becomes the new cursor.
110 111 112 113 114 115 |
# File 'lib/rubino/session/store.rb', line 110 def last_id(session_id) @db[:messages] .where(session_id: session_id) .order(Sequel.desc(Sequel.lit("rowid"))) .get(:id) end |
#recent(session_id, count:) ⇒ Object
Returns the N most recent messages for a session
73 74 75 76 77 78 79 80 81 |
# File 'lib/rubino/session/store.rb', line 73 def recent(session_id, count:) @db[:messages] .where(session_id: session_id) .order(Sequel.desc(:created_at), Sequel.desc(Sequel.lit("rowid"))) .limit(count) .all .reverse .map { |row| hydrate(row) } end |
#reseed_extraction_cursor_clamped(session_id) ⇒ Object
Repair the memory-extraction watermark after a DELETE (undo/retry rewind) without ever sealing an un-mined survivor — the cursor only ever moves BACKWARD here, never forward (MEM-1, R1-M2).
The cursor means “every message up to and including this rowid has been mined”. A delete can leave it dangling (the cursor message itself was cut). Naively re-seeding to the new tail would jump the watermark PAST any surviving message that sat between the old cursor and the cut point — those were never extracted, and sealing them silently drops their facts.
So we clamp: the new cursor is the newest SURVIVING message whose rowid is <= the old cursor’s rowid (the last position we KNOW was mined).
* old cursor still survives -> unchanged (later survivors stay un-mined
and get extracted next turn);
* old cursor was deleted -> falls back to the newest survivor at-or-
before it (every survivor predates the cut, so this is the new tail);
* old cursor was nil -> stays nil (never-extracted: re-mine all).
No-op when the session row is absent. Returns the (possibly unchanged) id.
156 157 158 159 160 161 162 163 |
# File 'lib/rubino/session/store.rb', line 156 def reseed_extraction_cursor_clamped(session_id) return nil unless @db[:sessions].where(id: session_id).any? old_cursor = @db[:sessions].where(id: session_id).get(:memory_extracted_msg_id) clamped = newest_surviving_at_or_before(session_id, old_cursor) @db[:sessions].where(id: session_id).update(memory_extracted_msg_id: clamped) clamped end |
#search(query:, since: nil, until_: nil, role: nil, tool: nil, limit: 20) ⇒ Array<Hash>
Full-text search across messages backed by the ‘messages_fts` FTS5 virtual table (see migration 007). Returns hydrated rows with an FTS5 snippet() highlighting the match. Filters compose on top of the FTS MATCH so the index does the heavy lifting and SQL prunes the rest.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/rubino/session/store.rb', line 234 def search(query:, since: nil, until_: nil, role: nil, tool: nil, limit: 20) return [] if query.nil? || query.to_s.strip.empty? limit = limit.to_i.clamp(1, 100) match_query = sanitize_fts_query(query) dataset = @db[:messages_fts] .where(Sequel.lit("messages_fts MATCH ?", match_query)) .join(:messages, Sequel[:messages][:rowid] => Sequel[:messages_fts][:rowid]) .select( Sequel[:messages][:id].as(:message_id), Sequel[:messages][:session_id], Sequel[:messages][:role], Sequel[:messages][:created_at], Sequel.lit("snippet(messages_fts, 0, '<mark>', '</mark>', '...', 16) AS snippet") ) dataset = dataset.where(Sequel[:messages][:role] => role) if role dataset = dataset.where(Sequel[:messages][:tool_name] => tool) if tool dataset = dataset.where(Sequel.lit("messages.created_at >= ?", since)) if since dataset = dataset.where(Sequel.lit("messages.created_at <= ?", until_)) if until_ dataset .order(Sequel.desc(Sequel[:messages][:created_at]), Sequel.desc(Sequel.lit("messages.rowid"))) .limit(limit) .all .map { |row| row.merge(run_id: nil) } end |
#seed_extraction_cursor(session_id) ⇒ Object
Seed/reset this session’s memory-extraction watermark to its current last message (by rowid) so the extractor’s next turn feeds only what is added AFTER this point — not the whole transcript.
Used by fork/branch/compaction, which copy a FULLY-MINED transcript into a fresh child whose cursor starts NULL — without seeding, the child would re-mine the ENTIRE copied transcript on its first turn (MEM-2). The caller MUST have flushed/extracted the source up to its tail first (compaction flushes before copy; #branch_runner now does too), so every copied message is already mined and sealing the cursor at the tail loses nothing.
Sets the cursor to nil for an empty session (the never-extracted state). No-op when the session row is absent. Returns the new cursor id (or nil).
130 131 132 133 134 135 136 |
# File 'lib/rubino/session/store.rb', line 130 def seed_extraction_cursor(session_id) return nil unless @db[:sessions].where(id: session_id).any? new_cursor = last_id(session_id) @db[:sessions].where(id: session_id).update(memory_extracted_msg_id: new_cursor) new_cursor end |
#since(session_id, after_id:) ⇒ Object
Returns messages strictly NEWER than after_id, in INSERTION order. Used by the memory extractor’s per-session cursor (#249): feeding only the messages a turn actually added, instead of an overlapping recency window.
Ordering is on the monotonic ‘rowid` — NOT the wall-clock `created_at` —so a message whose `created_at` regresses (backward clock step, NTP correction, VM suspend) is still seen as “new” and never silently skipped (MEM-3): its rowid is strictly greater than the cursor’s even when its timestamp is smaller. rowid is SQLite’s append-only insertion counter, exactly the “what arrived after the watermark” semantics the cursor wants. A nil/unknown after_id (never-extracted session) returns the whole session in order.
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/rubino/session/store.rb', line 95 def since(session_id, after_id:) cursor_rowid = after_id && @db[:messages] .where(id: after_id, session_id: session_id) .get(Sequel.lit("rowid")) ds = @db[:messages] .where(session_id: session_id) .order(Sequel.lit("rowid")) ds = ds.where(Sequel.lit("rowid > ?", cursor_rowid)) if cursor_rowid ds.all.map { |row| hydrate(row) } end |
#token_sum(session_id) ⇒ Object
Returns estimated token sum for a session
183 184 185 186 187 |
# File 'lib/rubino/session/store.rb', line 183 def token_sum(session_id) @db[:messages] .where(session_id: session_id) .sum(:token_count) || 0 end |