Class: Clacky::Server::SessionRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/server/session_registry.rb

Overview

SessionRegistry is the single authoritative source for session state.

It owns two concerns:

1. Runtime state  — agent instance, thread, status, pending_task, idle_timer.
2. Session list   — reads from disk (via session_manager) and enriches with
                  live runtime status. `list` is the only place the session
                  list is assembled; no callers should build it elsewhere.

Lazy restore: ensure(session_id) loads a disk session into the registry on demand. All session-specific APIs call this before touching the registry so disk-only sessions (e.g. loaded via loadMore) just work transparently.

Thread safety: all public methods are protected by a Mutex.

Constant Summary collapse

SESSION_TIMEOUT =

24 hours of inactivity before cleanup

24 * 60 * 60

Instance Method Summary collapse

Constructor Details

#initialize(session_manager: nil, session_restorer: nil, agent_config:) ⇒ SessionRegistry

Returns a new instance of SessionRegistry.



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/clacky/server/session_registry.rb', line 23

def initialize(session_manager: nil, session_restorer: nil, agent_config:)
  @sessions         = {}
  @mutex            = Mutex.new
  @session_manager  = session_manager
  @session_restorer = session_restorer
  @agent_config     = agent_config
  # Tracks sessions currently being restored from disk.
  # Other threads calling ensure() for the same id will wait via @restore_cond
  # instead of seeing a half-built session (agent=nil).
  @restoring        = {}
  @restore_cond     = ConditionVariable.new
end

Instance Method Details

#cleanup_stale!Object

Remove sessions idle longer than SESSION_TIMEOUT.



379
380
381
382
383
384
385
386
# File 'lib/clacky/server/session_registry.rb', line 379

def cleanup_stale!
  cutoff = Time.now - SESSION_TIMEOUT
  @mutex.synchronize do
    @sessions.delete_if do |_id, session|
      session[:status] == :idle && session[:updated_at] < cutoff
    end
  end
end

#count_by_status(status) ⇒ Object



388
389
390
391
392
# File 'lib/clacky/server/session_registry.rb', line 388

def count_by_status(status)
  @mutex.synchronize do
    @sessions.count { |_, s| s[:status] == status }
  end
end

#create(session_id:) ⇒ Object

Create a new (empty) session entry and return its id. agent/ui/thread are set later via with_session once they are constructed.

Raises:

  • (ArgumentError)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/clacky/server/session_registry.rb', line 38

def create(session_id:)
  raise ArgumentError, "session_id is required" if session_id.nil? || session_id.empty?

  session = {
    id:                   session_id,
    status:               :idle,
    error:                nil,
    error_code:           nil,
    top_up_url:           nil,
    raw_message:          nil,
    updated_at:           nil,
    agent:                nil,
    ui:                   nil,
    thread:               nil,
    idle_timer:           nil,
    pending_task:         nil,
    pending_working_dir:  nil
  }

  @mutex.synchronize { @sessions[session_id] = session }
  session_id
end

#cron_countObject

Count all cron sessions on disk (not filtered by pagination).



347
348
349
350
# File 'lib/clacky/server/session_registry.rb', line 347

def cron_count
  return 0 unless @session_manager
  @session_manager.all_sessions.count { |s| s_source(s) == "cron" }
end

#delete(session_id) ⇒ Object

Delete a session from registry (and interrupt its thread).



353
354
355
356
357
358
359
360
361
362
# File 'lib/clacky/server/session_registry.rb', line 353

def delete(session_id)
  @mutex.synchronize do
    session = @sessions.delete(session_id)
    return false unless session

    session[:idle_timer]&.cancel
    session[:thread]&.raise(Clacky::AgentInterrupted, "Session deleted")
    true
  end
end

#each_live_agentObject

Yield [session_id, agent, thread] for each session that currently has an in-memory agent. Used by the worker's graceful-shutdown path to flush any unsaved @history (e.g. a user message added at the start of Agent#run that hasn't yet reached the save-on-completion branch in run_agent_task).

The session id list is snapshotted under the mutex so concurrent mutations don't disturb iteration; the yield happens outside the mutex so callers can do slow I/O (JSON serialization, File.write) without blocking other registry operations.



432
433
434
435
436
437
438
439
440
441
# File 'lib/clacky/server/session_registry.rb', line 432

def each_live_agent
  snapshot = @mutex.synchronize do
    @sessions.filter_map do |id, s|
      agent = s[:agent]
      next nil unless agent
      [id, agent, s[:thread]]
    end
  end
  snapshot.each { |id, agent, thread| yield id, agent, thread }
end

#ensure(session_id) ⇒ Object

Ensure a session is in the registry, loading from disk if necessary. Returns true if the session is now available, false if not found anywhere.

Thread-safe: if two threads race on the same session_id, the second one waits for the first to finish restoring (including agent construction) rather than seeing a half-built entry with agent=nil.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/clacky/server/session_registry.rb', line 67

def ensure(session_id)
  session_data = nil

  @mutex.synchronize do
    # Another thread is currently restoring this session (including the case where
    # @registry.create was already called but with_session agent-set is not yet done) —
    # wait for it to finish so callers never see agent=nil.
    if @restoring[session_id]
      @restore_cond.wait(@mutex) until !@restoring[session_id]
      return @sessions.key?(session_id)
    end

    # Already fully ready (not being restored) — fast path.
    return true if @sessions.key?(session_id)

    return false unless @session_manager && @session_restorer

    session_data = @session_manager.load(session_id)
    return false unless session_data

    # Mark as "restore in progress" so concurrent callers wait.
    @restoring[session_id] = true
  end

  # Run the (potentially slow) restore outside the mutex so other sessions
  # are not blocked during agent construction.
  begin
    @session_restorer.call(session_data)
  ensure
    @mutex.synchronize do
      @restoring.delete(session_id)
      @restore_cond.broadcast
    end
  end

  @sessions.key?(session_id)
end

#evict_excess_idle!Object



406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/clacky/server/session_registry.rb', line 406

def evict_excess_idle!
  to_evict = []

  @mutex.synchronize do
    idle = @sessions.select { |_, s| s[:status] == :idle && s[:agent] }
             .sort_by { |_, s| s[:updated_at] || Time.at(0) }

    while idle.size > max_idle_agents
      id, session = idle.shift
      to_evict << [id, session]
    end
  end

  to_evict.each { |id, session| persist_and_release(id, session) }
end

#exist?(session_id) ⇒ Boolean

True if the session exists in registry (runtime).

Returns:

  • (Boolean)


365
366
367
# File 'lib/clacky/server/session_registry.rb', line 365

def exist?(session_id)
  @mutex.synchronize { @sessions.key?(session_id) }
end

#get(session_id) ⇒ Object

Retrieve a session hash by id (returns nil if not found).



125
126
127
# File 'lib/clacky/server/session_registry.rb', line 125

def get(session_id)
  @mutex.synchronize { @sessions[session_id]&.dup }
end

#list(limit: nil, before: nil, q: nil, q_scope: "name", date: nil, type: nil, include_pinned: true) ⇒ Object

Return a session list from disk enriched with live registry status. Sorted by created_at descending (newest first).

Parameters (all optional, independent):

source:  "manual"|"cron"|"channel"|"setup"|nil
       nil = no source filter (all sessions)
profile: "general"|"coding"|nil
       nil = no agent_profile filter
limit:   max sessions to return (applies to NON-PINNED only; see below)
before:  ISO8601 cursor — only sessions with created_at < before
        (also applies to NON-PINNED only; pinned items are a separate
        logical section, they should never be paginated away)
include_pinned: when true (default), all matching pinned sessions are
        always returned on the FIRST page (before == nil) regardless
        of limit. Subsequent pages (before set) contain only
        non-pinned sessions. This guarantees that users who pinned
        an old session always see it at the top of the sidebar,
        even if many newer sessions exist.

Ordering of the returned array:

[ ...all_pinned_matching (newest-first), ...non_pinned (newest-first, limited) ]

source and profile are orthogonal — either can be nil independently.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/clacky/server/session_registry.rb', line 164

def list(limit: nil, before: nil, q: nil, q_scope: "name", date: nil, type: nil, include_pinned: true)
  return [] unless @session_manager

  live = @mutex.synchronize do
    @sessions.transform_values do |s|
      model_info = s[:agent]&.current_model_info
      live_name  = s[:agent]&.name
      live_name  = nil if live_name&.empty?
    live_cost_source = s[:agent]&.cost_source
    { status: s[:status], error: s[:error], error_code: s[:error_code], top_up_url: s[:top_up_url], raw_message: s[:raw_message],
      updated_at: s[:updated_at]&.iso8601,
      model: model_info&.dig(:model), model_id: model_info&.dig(:id), name: live_name,
      total_tasks: s[:agent]&.total_tasks, total_cost: s[:agent]&.total_cost,
      cost_source: live_cost_source,
      reasoning_effort: s[:agent]&.reasoning_effort,
      latest_latency: s[:agent]&.latest_latency,
      card_model: model_info&.dig(:card_model),
      sub_model: model_info&.dig(:sub_model),
      sub_model_options: sub_model_options_for(model_info) }
    end
  end

  all = @session_manager.all_sessions  # already sorted newest-first

  # ── type filter (replaces old source/profile split) ──────────────────
  # type=coding  → agent_profile == "coding"
  # type=manual/cron/channel/setup → source match (profile=general implied)
  if type
    if type == "coding"
      all = all.select { |s| (s[:agent_profile] || "general").to_s == "coding" }
    else
      all = all.select { |s| s_source(s) == type && (s[:agent_profile] || "general").to_s != "coding" }
    end
  end

  # ── date filter (YYYY-MM-DD, matches created_at prefix) ──────────────
  all = all.select { |s| s[:created_at].to_s.start_with?(date) } if date

  # ── name / id / content search ───────────────────────────────────────
  content_snippets = nil
  if q && !q.empty?
    if q_scope == "content"
      content_snippets = @session_manager.search_content(q)
      if content_snippets.empty?
        all = []
      else
        prefix_set = content_snippets.keys.to_set
        all = all.select { |s| prefix_set.include?((s[:session_id] || "")[0, 8]) }
      end
    else
      q_down = q.downcase
      id_match_eligible = q_down.match?(/\A[0-9a-f]{6,}\z/)
      all = all.select { |s|
        (s[:name] || "").downcase.include?(q_down) ||
          (id_match_eligible && (s[:session_id] || "").downcase.include?(q_down))
      }
    end
  end

  # ── Split pinned vs non-pinned BEFORE applying `before`/`limit`.
  # Pinned sessions bypass pagination entirely so an old pinned session
  # never falls off the first page just because newer sessions exist.
  # (Regression fix for 0.9.37: previously `all_sessions` was only
  # sorted by created_at and `limit` cut off old pinned rows, making
  # them invisible until the user clicked "load more".)
  pinned, non_pinned = all.partition { |s| s[:pinned] }

  # `before` cursor ONLY applies to non-pinned (paginated) sessions.
  # Cursor field must match the sort key in all_sessions (updated_at,
  # falling back to created_at for legacy rows).
  non_pinned = non_pinned.select { |s| (s[:updated_at] || s[:created_at] || "") < before } if before
  non_pinned = non_pinned.first(limit) if limit

  # Pinned section: only included on the first page (before == nil) so
  # "load more" responses don't re-send them. On first page, return ALL
  # matching pinned sessions regardless of limit.
  pinned_section = (include_pinned && before.nil?) ? pinned : []

  ordered = pinned_section + non_pinned

  ordered.map do |s|
    row = build_enriched_row(s, live[s[:session_id]])
    if content_snippets
      short = (s[:session_id] || "")[0, 8]
      snip = content_snippets[short]
      row[:search_snippet] = snip if snip
    end
    row
  end
end

#max_idle_agentsObject



398
399
400
# File 'lib/clacky/server/session_registry.rb', line 398

def max_idle_agents
  @agent_config.max_idle_agents
end

#max_running_agentsObject



394
395
396
# File 'lib/clacky/server/session_registry.rb', line 394

def max_running_agents
  @agent_config.max_running_agents
end

#restore_from_disk(n: 2) ⇒ Object

Restore at most n sessions per source as a hot cache at startup. Everything else is loaded on demand via ensure(id).



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/clacky/server/session_registry.rb', line 107

def restore_from_disk(n: 2)
  return unless @session_manager && @session_restorer

  all = @session_manager.all_sessions
    .sort_by { |s| s[:created_at] || "" }
    .reverse

  counts = Hash.new(0)
  all.each do |session_data|
    src = (session_data[:source] || "manual").to_s
    next if counts[src] >= n
    next if exist?(session_data[:session_id])
    @session_restorer.call(session_data)
    counts[src] += 1
  end
end

#running_full?Boolean

Returns:

  • (Boolean)


402
403
404
# File 'lib/clacky/server/session_registry.rb', line 402

def running_full?
  count_by_status(:running) >= max_running_agents
end

#s_source(s) ⇒ Object

Normalize source field from a disk session hash. "system" is a legacy value renamed to "setup" — treat them as equivalent.



339
340
341
342
# File 'lib/clacky/server/session_registry.rb', line 339

def s_source(s)
  src = (s[:source] || "manual").to_s
  src == "system" ? "setup" : src
end

#session_summary(session_id) ⇒ Object

Build a summary hash for API responses (for in-registry sessions). Used when we need live agent fields (name, cost, etc.) after ensure().



461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/clacky/server/session_registry.rb', line 461

def session_summary(session_id)
  session = @mutex.synchronize { @sessions[session_id] }
  return nil unless session
  agent = session[:agent]
  return nil unless agent

  model_info = agent.current_model_info

  {
    id:              session[:id],
    name:            agent.name,
    working_dir:     agent.working_dir,
    status:          session[:status],
    created_at:      agent.created_at,
    updated_at:      session[:updated_at]&.iso8601 || agent.created_at,
    total_tasks:     agent.total_tasks || 0,
    total_cost:      agent.total_cost  || 0.0,
    cost_source:     agent.cost_source.to_s,
    error:           session[:error],
    model:           model_info&.dig(:model),
    permission_mode: agent.permission_mode,
    source:          agent.source.to_s,
    agent_profile:   agent.agent_profile.name,
    pinned:          agent.pinned || false,
    latest_latency:  agent.latest_latency,
  }
end

#snapshot(session_id) ⇒ Object

Return the same enriched hash that a list row would produce, for a single session — merging on-disk fields with in-memory live fields. Returns nil if the session is unknown on disk.

This is the targeted, O(1) counterpart to list used by the WS layer when it only needs one row (e.g. pushing a fresh snapshot to a client that just (re)subscribed, or broadcasting a status-change update).



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/clacky/server/session_registry.rb', line 262

def snapshot(session_id)
  return nil unless @session_manager
  disk = @session_manager.load(session_id)
  return nil unless disk

  live = @mutex.synchronize do
    s = @sessions[session_id]
    next nil unless s
    model_info = s[:agent]&.current_model_info
    live_name  = s[:agent]&.name
    live_name  = nil if live_name&.empty?
    { status: s[:status], error: s[:error], error_code: s[:error_code], top_up_url: s[:top_up_url], raw_message: s[:raw_message],
      updated_at: s[:updated_at]&.iso8601,
      model: model_info&.dig(:model), model_id: model_info&.dig(:id),
      name: live_name, total_tasks: s[:agent]&.total_tasks,
      total_cost: s[:agent]&.total_cost, cost_source: s[:agent]&.cost_source,
      reasoning_effort: s[:agent]&.reasoning_effort,
      latest_latency: s[:agent]&.latest_latency,
      card_model: model_info&.dig(:card_model),
      sub_model: model_info&.dig(:sub_model),
      sub_model_options: sub_model_options_for(model_info) }
  end

  build_enriched_row(disk, live)
end

#update(session_id, **fields) ⇒ Object

Update arbitrary runtime fields of a session (status, error, pending_*, etc.).



130
131
132
133
134
135
136
137
138
139
# File 'lib/clacky/server/session_registry.rb', line 130

def update(session_id, **fields)
  @mutex.synchronize do
    session = @sessions[session_id]
    return false unless session

    fields[:updated_at] = Time.now
    session.merge!(fields)
    true
  end
end

#with_session(session_id) ⇒ Object

Execute a block with exclusive access to the raw session hash.



370
371
372
373
374
375
376
# File 'lib/clacky/server/session_registry.rb', line 370

def with_session(session_id)
  @mutex.synchronize do
    session = @sessions[session_id]
    return nil unless session
    yield session
  end
end