Class: Rubino::Session::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/session/store.rb

Overview

Persists and queries messages within a session.

Ordering note: created_at is iso8601 with second precision, so multiple messages can share the same timestamp. Read/delete paths that need a strict total order break ties on the SQLite ‘rowid` column.

#last_for_role is the entry point used by retry/undo to find the last user (or assistant) turn before rewinding history.

Instance Method Summary collapse

Constructor Details

#initialize(db: nil) ⇒ Store

Returns a new instance of Store.



16
17
18
# File 'lib/rubino/session/store.rb', line 16

def initialize(db: nil)
  @db = db || Rubino.database.db
end

Instance Method Details

#append(message) ⇒ Object

Appends a message to a session

Raises:



21
22
23
24
25
26
# File 'lib/rubino/session/store.rb', line 21

def append(message)
  raise SessionError, "Invalid message" unless message.valid?

  @db[:messages].insert(message.to_row)
  message
end

#copy_into(target_session_id, messages) ⇒ Object

Copies messages into another session preserving ALL wire-significant fields. Assistant tool calls live in metadata (not tool_call_id), so dropping metadata orphans the toolUse block and 400s strict providers (Anthropic/Bedrock) on resume. token_count is copied too so the target session’s budget accounting stays accurate.



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rubino/session/store.rb', line 44

def copy_into(target_session_id, messages)
  messages.each do |msg|
    create(
      session_id: target_session_id,
      role: msg.role,
      content: msg.content,
      tool_name: msg.tool_name,
      tool_call_id: msg.tool_call_id,
      token_count: msg.token_count,
      metadata: msg.
    )
  end
end

#count(session_id) ⇒ Object

Returns total message count for a session



166
167
168
# File 'lib/rubino/session/store.rb', line 166

def count(session_id)
  @db[:messages].where(session_id: session_id).count
end

#count_by_role(session_id) ⇒ Object

Message count for a session broken down by role (#382), so ‘sessions show` can report the REAL cumulative count and label how many rows are tool messages — the cached sessions.message_count column only tracks top-level turns and hides every assistant(tool_use)/tool(result) row. Returns a Hash role => count.



175
176
177
178
179
180
# File 'lib/rubino/session/store.rb', line 175

def count_by_role(session_id)
  @db[:messages]
    .where(session_id: session_id)
    .group_and_count(:role)
    .to_hash(:role, :count)
end

#create(session_id:, role:, content:, **attrs) ⇒ Object

Creates and appends a message from attributes



29
30
31
32
33
34
35
36
37
# File 'lib/rubino/session/store.rb', line 29

def create(session_id:, role:, content:, **attrs)
  message = Message.new(
    session_id: session_id,
    role: role,
    content: content,
    **attrs
  )
  append(message)
end

#delete_from_inclusive(session_id, from_id:) ⇒ Integer

Deletes the given message and every message inserted after it. Used by undo/retry/rewind to rewind history.

Cuts on the monotonic ‘rowid` (insertion order), the same total order #since now uses, so the rewind point is unambiguous even if later messages carry an earlier `created_at` than from_id (clock skew).

After the cut, the memory-extraction watermark is CLAMPED (MEM-1, R1-M2): the cursor message may itself have just been deleted, leaving a dangling watermark that made the next extraction re-mine the whole remaining session — which could resurrect a fact the user just ‘forget`-ed. We repair it WITHOUT moving it forward, so a surviving but not-yet-mined message between the old cursor and the cut is never sealed/lost (see #reseed_extraction_cursor_clamped).

Parameters:

  • session_id (String)
  • from_id (String)

    id of the first message to delete

Returns:

  • (Integer)

    number of rows removed



207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/rubino/session/store.rb', line 207

def delete_from_inclusive(session_id, from_id:)
  from_rowid = @db[:messages]
               .where(id: from_id, session_id: session_id)
               .get(Sequel.lit("rowid"))
  return 0 unless from_rowid

  removed = @db[:messages]
            .where(session_id: session_id)
            .where(Sequel.lit("rowid >= ?", from_rowid))
            .delete
  reseed_extraction_cursor_clamped(session_id)
  removed
end

#for_session(session_id, limit: nil) ⇒ Object

Returns all messages for a session in chronological order. created_at is second-precision, so we tie-break on rowid — without this, an assistant preamble and a tool_result persisted in the same second can come back swapped, which makes the resumed transcript look like the tool fired before the model’s preamble (or worse, like an empty assistant box wrapping the tool).



64
65
66
67
68
69
70
# File 'lib/rubino/session/store.rb', line 64

def for_session(session_id, limit: nil)
  dataset = @db[:messages]
            .where(session_id: session_id)
            .order(:created_at, Sequel.lit("rowid"))
  dataset = dataset.limit(limit) if limit
  dataset.all.map { |row| hydrate(row) }
end

#last_for_role(session_id, role) ⇒ Object

Returns the most recent message for ‘role` (e.g. “user”, “assistant”). Tie-broken on rowid like the other read paths. Used by retry/undo.



265
266
267
268
269
270
271
# File 'lib/rubino/session/store.rb', line 265

def last_for_role(session_id, role)
  row = @db[:messages]
        .where(session_id: session_id, role: role)
        .order(Sequel.desc(:created_at), Sequel.desc(Sequel.lit("rowid")))
        .first
  row && hydrate(row)
end

#last_id(session_id) ⇒ Object

The id of the newest message in a session (by insertion ‘rowid`), or nil for an empty session. Used to advance/seed the memory-extraction cursor —rowid (not created_at) so the watermark tracks insertion order and a backdated tail message still becomes the new cursor.



110
111
112
113
114
115
# File 'lib/rubino/session/store.rb', line 110

def last_id(session_id)
  @db[:messages]
    .where(session_id: session_id)
    .order(Sequel.desc(Sequel.lit("rowid")))
    .get(:id)
end

#recent(session_id, count:) ⇒ Object

Returns the N most recent messages for a session



73
74
75
76
77
78
79
80
81
# File 'lib/rubino/session/store.rb', line 73

def recent(session_id, count:)
  @db[:messages]
    .where(session_id: session_id)
    .order(Sequel.desc(:created_at), Sequel.desc(Sequel.lit("rowid")))
    .limit(count)
    .all
    .reverse
    .map { |row| hydrate(row) }
end

#reseed_extraction_cursor_clamped(session_id) ⇒ Object

Repair the memory-extraction watermark after a DELETE (undo/retry rewind) without ever sealing an un-mined survivor — the cursor only ever moves BACKWARD here, never forward (MEM-1, R1-M2).

The cursor means “every message up to and including this rowid has been mined”. A delete can leave it dangling (the cursor message itself was cut). Naively re-seeding to the new tail would jump the watermark PAST any surviving message that sat between the old cursor and the cut point — those were never extracted, and sealing them silently drops their facts.

So we clamp: the new cursor is the newest SURVIVING message whose rowid is <= the old cursor’s rowid (the last position we KNOW was mined).

* old cursor still survives  -> unchanged (later survivors stay un-mined
  and get extracted next turn);
* old cursor was deleted     -> falls back to the newest survivor at-or-
  before it (every survivor predates the cut, so this is the new tail);
* old cursor was nil         -> stays nil (never-extracted: re-mine all).

No-op when the session row is absent. Returns the (possibly unchanged) id.



156
157
158
159
160
161
162
163
# File 'lib/rubino/session/store.rb', line 156

def reseed_extraction_cursor_clamped(session_id)
  return nil unless @db[:sessions].where(id: session_id).any?

  old_cursor = @db[:sessions].where(id: session_id).get(:memory_extracted_msg_id)
  clamped = newest_surviving_at_or_before(session_id, old_cursor)
  @db[:sessions].where(id: session_id).update(memory_extracted_msg_id: clamped)
  clamped
end

#search(query:, since: nil, until_: nil, role: nil, tool: nil, limit: 20) ⇒ Array<Hash>

Full-text search across messages backed by the ‘messages_fts` FTS5 virtual table (see migration 007). Returns hydrated rows with an FTS5 snippet() highlighting the match. Filters compose on top of the FTS MATCH so the index does the heavy lifting and SQL prunes the rest.

Parameters:

  • query (String)

    FTS5 MATCH expression; sanitized via Quoting

  • since (String, nil) (defaults to: nil)

    iso8601 lower bound on created_at

  • until_ (String, nil) (defaults to: nil)

    iso8601 upper bound on created_at

  • role (String, nil) (defaults to: nil)

    restrict to a specific message role

  • tool (String, nil) (defaults to: nil)

    restrict to a specific tool_name

  • limit (Integer) (defaults to: 20)

    cap on rows returned (max 100)

Returns:

  • (Array<Hash>)

    rows: session_id, run_id (nil — not tracked on messages), message_id, role, snippet, created_at



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/rubino/session/store.rb', line 234

def search(query:, since: nil, until_: nil, role: nil, tool: nil, limit: 20)
  return [] if query.nil? || query.to_s.strip.empty?

  limit = limit.to_i.clamp(1, 100)
  match_query = sanitize_fts_query(query)

  dataset = @db[:messages_fts]
            .where(Sequel.lit("messages_fts MATCH ?", match_query))
            .join(:messages, Sequel[:messages][:rowid] => Sequel[:messages_fts][:rowid])
            .select(
              Sequel[:messages][:id].as(:message_id),
              Sequel[:messages][:session_id],
              Sequel[:messages][:role],
              Sequel[:messages][:created_at],
              Sequel.lit("snippet(messages_fts, 0, '<mark>', '</mark>', '...', 16) AS snippet")
            )

  dataset = dataset.where(Sequel[:messages][:role] => role) if role
  dataset = dataset.where(Sequel[:messages][:tool_name] => tool) if tool
  dataset = dataset.where(Sequel.lit("messages.created_at >= ?", since)) if since
  dataset = dataset.where(Sequel.lit("messages.created_at <= ?", until_)) if until_

  dataset
    .order(Sequel.desc(Sequel[:messages][:created_at]), Sequel.desc(Sequel.lit("messages.rowid")))
    .limit(limit)
    .all
    .map { |row| row.merge(run_id: nil) }
end

#seed_extraction_cursor(session_id) ⇒ Object

Seed/reset this session’s memory-extraction watermark to its current last message (by rowid) so the extractor’s next turn feeds only what is added AFTER this point — not the whole transcript.

Used by fork/branch/compaction, which copy a FULLY-MINED transcript into a fresh child whose cursor starts NULL — without seeding, the child would re-mine the ENTIRE copied transcript on its first turn (MEM-2). The caller MUST have flushed/extracted the source up to its tail first (compaction flushes before copy; #branch_runner now does too), so every copied message is already mined and sealing the cursor at the tail loses nothing.

Sets the cursor to nil for an empty session (the never-extracted state). No-op when the session row is absent. Returns the new cursor id (or nil).



130
131
132
133
134
135
136
# File 'lib/rubino/session/store.rb', line 130

def seed_extraction_cursor(session_id)
  return nil unless @db[:sessions].where(id: session_id).any?

  new_cursor = last_id(session_id)
  @db[:sessions].where(id: session_id).update(memory_extracted_msg_id: new_cursor)
  new_cursor
end

#since(session_id, after_id:) ⇒ Object

Returns messages strictly NEWER than after_id, in INSERTION order. Used by the memory extractor’s per-session cursor (#249): feeding only the messages a turn actually added, instead of an overlapping recency window.

Ordering is on the monotonic ‘rowid` — NOT the wall-clock `created_at` —so a message whose `created_at` regresses (backward clock step, NTP correction, VM suspend) is still seen as “new” and never silently skipped (MEM-3): its rowid is strictly greater than the cursor’s even when its timestamp is smaller. rowid is SQLite’s append-only insertion counter, exactly the “what arrived after the watermark” semantics the cursor wants. A nil/unknown after_id (never-extracted session) returns the whole session in order.



95
96
97
98
99
100
101
102
103
104
# File 'lib/rubino/session/store.rb', line 95

def since(session_id, after_id:)
  cursor_rowid = after_id && @db[:messages]
                 .where(id: after_id, session_id: session_id)
                 .get(Sequel.lit("rowid"))
  ds = @db[:messages]
       .where(session_id: session_id)
       .order(Sequel.lit("rowid"))
  ds = ds.where(Sequel.lit("rowid > ?", cursor_rowid)) if cursor_rowid
  ds.all.map { |row| hydrate(row) }
end

#token_sum(session_id) ⇒ Object

Returns estimated token sum for a session



183
184
185
186
187
# File 'lib/rubino/session/store.rb', line 183

def token_sum(session_id)
  @db[:messages]
    .where(session_id: session_id)
    .sum(:token_count) || 0
end