Class: Rubino::Session::Repository

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/session/repository.rb

Overview

Thin CRUD wrapper over the ‘sessions` table. All session persistence goes through this class; callers should not touch the dataset directly.

Notes:

  • #find supports prefix matching on the UUID so short ids from the CLI resolve to a full session row.

  • #latest_active is used to resume the most recently touched session.

  • #destroy! cascades manually to events, tool_calls, messages, session_summaries and runs inside a single transaction (no FK cascade in schema; the runs FK would otherwise block the session delete).

Constant Summary collapse

LIKE_ESCAPE =

LIKE-pattern escape char for the SAFE id-prefix match (#333a): ‘%` and `_` are LIKE wildcards, so an unescaped `find(“%”)` matched EVERY session. id_prefix_match escapes the metacharacters and declares this as the explicit ESCAPE char so only the trailing `%` we append is a wildcard.

"\\"
TITLE_MIN_CHARS =

A first prompt shorter than this is junk for titling purposes (#128): a throwaway “y”/“ok” the user immediately interrupted would otherwise become the session title and a useless one-char ‘–resume “y”` matcher.

3

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db: nil) ⇒ Repository

Returns a new instance of Repository.



69
70
71
# File 'lib/rubino/session/repository.rb', line 69

def initialize(db: nil)
  @db = db || Rubino.database.db
end

Class Method Details

.derive_title(text, max: 60) ⇒ Object

Derives a short, human-readable session title from the first user message. Deterministic and model-free (#103): collapse whitespace, strip a leading slash-command word, take the first line, and truncate on a word boundary. Returns nil for empty/blank input — and for junk-short input (#128) — so the caller leaves the session untitled; the next MEANINGFUL prompt titles it instead (Lifecycle#maybe_set_title retries every turn until a title sticks), and the resume hint falls back to the session id.



349
350
351
352
353
354
355
356
357
358
# File 'lib/rubino/session/repository.rb', line 349

def self.derive_title(text, max: 60)
  cleaned = text.to_s.split("\n").first.to_s.strip.gsub(/\s+/, " ")
  cleaned = cleaned.sub(%r{\A/\S+\s*}, "") # drop a leading slash command
  return nil if cleaned.length < TITLE_MIN_CHARS
  return cleaned if cleaned.length <= max

  truncated = cleaned[0, max].sub(/\s+\S*\z/, "")
  truncated = cleaned[0, max] if truncated.empty?
  "#{truncated}"
end

Instance Method Details

#build(source:, model: nil, provider: nil, title: nil, parent_session_id: nil, cwd: default_cwd) ⇒ Object

Builds an UNSAVED session record (in-memory only) with a real id, so the CLI can open ‘chat` without persisting a row until the user actually sends a message (#144). The row is inserted lazily by #persist! on the first message; a session the user opens and immediately exits never touches the DB, so `/sessions` stays free of (untitled)/0-msg junk.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/rubino/session/repository.rb', line 104

def build(source:, model: nil, provider: nil, title: nil, parent_session_id: nil, cwd: default_cwd)
  now = Time.now.utc.iso8601
  {
    id: generate_id,
    parent_session_id: parent_session_id,
    source: source,
    model: model,
    provider: provider,
    title: title,
    status: "active",
    cwd: cwd,
    message_count: 0,
    token_count: 0,
    created_at: now,
    updated_at: now,
    persisted: false
  }
end

#claim_for_resume!(row) ⇒ Object

Atomically claims a resumable session for THIS process (#390/residual #376). The explicit-resume guard used to be a check-then-stamp: ‘owned_by_other_live_process?` read owner_pid, and a LATER `update(id, owner_pid:)` stamped it — a TOCTOU window where two concurrent `–resume <id>` both read the SAME dead owner_pid, both passed the guard, and both stamped+wrote the row, interleaving (user,user …) into one malformed transcript. This collapses the read and the stamp into a single compare-and-swap, mirroring Jobs::Queue#claim!: stamp owner_pid only WHILE the row still carries the owner we saw (nil or the dead pid), so exactly ONE racer’s UPDATE matches and the loser sees rowcount 0 and forks. A LIVE foreign owner is rejected up front (returns false) so the second resumer still forks rather than stomping the live writer.

Returns true iff THIS process won the claim. ‘seen_owner_pid` is the owner_pid the caller observed (passed so the CAS targets exactly that value); when nil/ours/dead the claim is attempted, when alive-and-foreign it is refused without touching the row.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rubino/session/repository.rb', line 47

def claim_for_resume!(row) # rubocop:disable Naming/PredicateMethod -- mutating CAS (bang); the boolean reports whether THIS caller won the claim
  return false if live_owned_by_other?(row)

  seen = row[:owner_pid]
  now  = Time.now.utc.iso8601
  # CAS: only stamp if the row STILL carries the owner we saw. `seen` is
  # nil (unowned) or a dead pid; either way a concurrent winner has
  # already changed owner_pid to its own live pid, so this WHERE misses.
  cond = seen.nil? ? { owner_pid: nil } : { owner_pid: seen }
  updated = @db[:sessions]
            .where(id: row[:id])
            .where(cond)
            .update(owner_pid: Process.pid, updated_at: now)
  updated.positive?
end

#create(source:, model: nil, provider: nil, title: nil, parent_session_id: nil, cwd: default_cwd) ⇒ Object

Creates a new session and returns its record. cwd stamps the launch directory so resume can be scoped per-cwd (r5 MF-4 / C-1); defaults to the current workspace primary root so every session records where it started.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/rubino/session/repository.rb', line 76

def create(source:, model: nil, provider: nil, title: nil, parent_session_id: nil, cwd: default_cwd)
  now = Time.now.utc.iso8601
  id = generate_id

  @db[:sessions].insert(
    id: id,
    parent_session_id: parent_session_id,
    source: source,
    model: model,
    provider: provider,
    title: title,
    status: "active",
    owner_pid: Process.pid,
    cwd: cwd,
    message_count: 0,
    token_count: 0,
    created_at: now,
    updated_at: now
  )

  find(id)
end

#destroy!(id) ⇒ Object

Deletes a session and all related records. Also removes the session’s on-disk spill/paste artifacts (#374), which the DB cascade alone left ORPHANED: oversized pastes live under <home>/sessions/<id>/ and full tool-output spills under <home>/tool-results/<call_id>.txt. The tool_calls’ call_ids are captured BEFORE their rows are deleted so the matching spill files can be removed; the paste subtree is keyed by the session id directly. File removal runs AFTER the transaction commits so a rolled-back delete never strands the DB rows against deleted files.



368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/rubino/session/repository.rb', line 368

def destroy!(id)
  call_ids = @db[:tool_calls].where(session_id: id).select_map(:id)
  @db.transaction do
    @db[:events].where(session_id: id).delete
    @db[:tool_calls].where(session_id: id).delete
    @db[:messages].where(session_id: id).delete
    @db[:session_summaries].where(session_id: id).delete
    @db[:runs].where(session_id: id).delete
    @db[:sessions].where(id: id).delete
  end
  Util::SpillStore.destroy_session_files(id, call_ids: call_ids)
end

#end_session!(id) ⇒ Object

Ends a session



249
250
251
252
253
254
255
256
257
# File 'lib/rubino/session/repository.rb', line 249

def end_session!(id)
  now = Time.now.utc.iso8601
  @db[:sessions].where(id: id).update(
    status: "ended",
    ended_at: now,
    owner_pid: nil,
    updated_at: now
  )
end

#find(id) ⇒ Object

Finds a session by ID (supports prefix matching)



156
157
158
# File 'lib/rubino/session/repository.rb', line 156

def find(id)
  @db[:sessions].where(id_prefix_match(id)).first
end

#find_by_id_or_title(query) ⇒ Object

Resolves a user-supplied query to a session: tries ID prefix first (handles “abc12345” style short IDs), then falls back to a case- insensitive substring match across the 50 most recent sessions —against the title AND the full first user message. The stored title is truncated (~60 chars), so a memorable word from the TAIL of a long first prompt would otherwise silently fail to resume (#70). Returns the session row or nil. Centralised so the CLI Runner and the TUI history loader agree on what ‘–resume <query>` accepts.

Raises AmbiguousSessionError when >1 session matches, so the CLI can show the candidates instead of silently picking the first row — see issue triaged from the audit (#116).



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/rubino/session/repository.rb', line 172

def find_by_id_or_title(query)
  return nil if query.nil? || query.to_s.empty?

  id_matches = @db[:sessions].where(id_prefix_match(query)).all
  if id_matches.size > 1
    raise AmbiguousSessionError.new(query, id_matches)
  elsif id_matches.size == 1
    return id_matches.first
  end

  needle = query.to_s.downcase
  title_matches = list(limit: 50).select do |s|
    s[:title]&.downcase&.include?(needle) ||
      first_user_message(s[:id])&.downcase&.include?(needle)
  end
  if title_matches.size > 1
    raise AmbiguousSessionError.new(query, title_matches)
  elsif title_matches.size == 1
    return title_matches.first
  end

  nil
end

#increment_message_count!(id) ⇒ Object

Increments message count



233
234
235
236
237
238
# File 'lib/rubino/session/repository.rb', line 233

def increment_message_count!(id)
  @db[:sessions].where(id: id).update(
    message_count: Sequel[:message_count] + 1,
    updated_at: Time.now.utc.iso8601
  )
end

#latest_activeObject

Returns the most recent active session, if any



284
285
286
287
288
289
# File 'lib/rubino/session/repository.rb', line 284

def latest_active
  @db[:sessions]
    .where(status: "active")
    .order(Sequel.desc(:updated_at), Sequel.desc(Sequel.lit("rowid")))
    .first
end

#latest_resumableObject

Returns the most recent session worth resuming on a bare ‘chat`: the last session that actually has messages, regardless of status, so a closed terminal (status still “active”) OR a cleanly ended session can both be continued. Empty 0-message sessions are skipped so a stray earlier launch never shadows the real conversation (#99). Returns nil on a true first run, which the CLI uses to fall back to the welcome panel.



297
298
299
300
301
302
# File 'lib/rubino/session/repository.rb', line 297

def latest_resumable
  @db[:sessions]
    .where(resumable_predicate)
    .order(Sequel.desc(:updated_at), Sequel.desc(Sequel.lit("rowid")))
    .first
end

#latest_resumable_for_cwd(cwd = default_cwd) ⇒ Object

Bare ‘chat` / `–continue` auto-resume target, SCOPED to the launch dir (r5 MF-4 / C-1): the latest resumable session whose stored cwd matches the current directory, never the globally-latest. This is what kills “folder B silently resumes folder A”: a session started in /api carries cwd=/api and is invisible to a `chat` launched in /web, which instead finds /web’s own latest (or nil ⇒ fresh) — mirroring Claude Code/Codex’s per-cwd picker. Two sessions stamped to DIFFERENT dirs can never resolve to each other, so concurrent instances in different folders don’t stomp.

Also excludes sessions a DIFFERENT live process currently owns (status=“active” + an alive owner_pid that isn’t us): a second tab in the SAME dir must not silently latch onto the session the first tab is still writing (the two-tabs-stomp-one-session bleed). It forks a fresh session instead; the user can still reattach explicitly with ‘–resume <id>`. Compares on canonical (realpath) paths so a symlinked launch dir matches the stored root. Returns nil ⇒ caller starts fresh.



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/rubino/session/repository.rb', line 320

def latest_resumable_for_cwd(cwd = default_cwd)
  target = canonical(cwd)
  return nil if target.nil?

  @db[:sessions]
    .where(resumable_predicate)
    .exclude(cwd: nil)
    .order(Sequel.desc(:updated_at), Sequel.desc(Sequel.lit("rowid")))
    .all
    .find do |row|
      next false unless canonical(row[:cwd]) == target

      # Skip a session another live process is actively writing.
      !live_owned_by_other?(row)
    end
end

#list(limit: 20, status: nil, search: nil, cwd: nil, include_subagents: false) ⇒ Object

Lists sessions with optional filters. cwd scopes the listing to a single launch directory (#334): a bare ‘sessions list` defaults to the current dir so a multi-folder user only sees THIS project’s sessions, mirroring the per-cwd auto-resume picker; pass cwd: nil (the ‘–all` flag) to list every directory’s sessions as before. Compared on canonical (realpath) paths so a symlinked launch dir still matches the stored root, which means the cwd filter runs in Ruby (not SQL) AFTER the status/search predicates — the limit is therefore applied post-filter. Sessions created by the ‘task` tool’s subagent runs are tagged source=“subagent” (Agent::Runner session_source). They are internal machinery — “Use the shell tool to run exactly this…” prompt-sessions —not the user’s own conversations, so they are EXCLUDED from the user-facing list/picker by default (item 2), the way Claude Code hides its Task subagent sessions. They remain reachable by explicit id via #find / #find_by_id_or_title (which never apply this filter), so a subagent session is still resumable when the id is known.



212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/rubino/session/repository.rb', line 212

def list(limit: 20, status: nil, search: nil, cwd: nil, include_subagents: false)
  dataset = @db[:sessions].order(Sequel.desc(:created_at), Sequel.desc(Sequel.lit("rowid")))
  dataset = dataset.where(status: status) if status
  dataset = dataset.exclude(source: "subagent") unless include_subagents
  dataset = dataset.where(Sequel.like(:title, "%#{search}%")) if search && !search.empty?

  return dataset.limit(limit).all if cwd.nil?

  target = canonical(cwd)
  return dataset.limit(limit).all if target.nil?

  dataset.all.select { |row| canonical(row[:cwd]) == target }.first(limit)
end

#owned_by_other_live_process?(row) ⇒ Boolean

Public re-export of the live-owner check (#347): explicit ‘–resume <id>` (the Runner) needs the SAME stomp guard auto-resume already applies, so a second process resuming a session a first live process is still writing can fork instead of interleaving writes into one malformed transcript. True when this row has an alive owner_pid that isn’t us, for a session of ANY status (#376): an ended session a live process is re-writing is guarded too, so concurrent explicit resumes of it fork instead of interleaving.

Returns:

  • (Boolean)


26
27
28
# File 'lib/rubino/session/repository.rb', line 26

def owned_by_other_live_process?(row)
  live_owned_by_other?(row)
end

#persist!(session) ⇒ Object

Inserts a session row built by #build if it isn’t already in the DB. Idempotent: a no-op once persisted (the common per-message path checks this first). Returns the (now persisted) session record.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/rubino/session/repository.rb', line 126

def persist!(session)
  return session if session[:persisted] || persisted?(session[:id])

  @db[:sessions].insert(
    id: session[:id],
    parent_session_id: session[:parent_session_id],
    source: session[:source],
    model: session[:model],
    provider: session[:provider],
    title: session[:title],
    status: session[:status] || "active",
    owner_pid: Process.pid,
    cwd: session[:cwd],
    message_count: 0,
    token_count: 0,
    created_at: session[:created_at] || Time.now.utc.iso8601,
    updated_at: Time.now.utc.iso8601
  )
  session[:persisted] = true
  session
end

#persisted?(id) ⇒ Boolean

True when a row with this id exists in the sessions table.

Returns:

  • (Boolean)


149
150
151
152
153
# File 'lib/rubino/session/repository.rb', line 149

def persisted?(id)
  return false if id.nil?

  !@db[:sessions].where(id: id).empty?
end

#reap_orphaned_active!Object

Reaps orphaned sessions: any row still “active” whose owning process is gone is stamped “ended” (#11). This covers the un-trappable hard kill (SIGKILL) and a closed terminal whose SIGHUP never reached the process, where neither the clean-exit path nor the signal traps ran. Rows owned by a live process (including the current one) and rows with no recorded pid (pre-#11 / future sources) are left untouched. Called lazily before listing/resuming sessions; best-effort, returns the number reaped.



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/rubino/session/repository.rb', line 266

def reap_orphaned_active!
  reaped = 0
  @db[:sessions]
    .where(status: "active")
    .exclude(owner_pid: nil)
    .select(:id, :owner_pid)
    .each do |row|
      next if process_alive?(row[:owner_pid])

      end_session!(row[:id])
      reaped += 1
    end
  reaped
rescue StandardError
  reaped
end

#update(id, **attrs) ⇒ Object

Updates a session’s attributes



227
228
229
230
# File 'lib/rubino/session/repository.rb', line 227

def update(id, **attrs)
  attrs[:updated_at] = Time.now.utc.iso8601
  @db[:sessions].where(id: id).update(attrs)
end

#update_token_count!(id, token_count) ⇒ Object

Updates token count



241
242
243
244
245
246
# File 'lib/rubino/session/repository.rb', line 241

def update_token_count!(id, token_count)
  @db[:sessions].where(id: id).update(
    token_count: token_count,
    updated_at: Time.now.utc.iso8601
  )
end