Class: Clacky::Server::SessionRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/server/session_registry.rb

Overview

SessionRegistry is the single authoritative source for session state.

It owns two concerns:

1. Runtime state  — agent instance, thread, status, pending_task, idle_timer.
2. Session list   — reads from disk (via session_manager) and enriches with
                    live runtime status. `list` is the only place the session
                    list is assembled; no callers should build it elsewhere.

Lazy restore: ‘ensure(session_id)` loads a disk session into the registry on demand. All session-specific APIs call this before touching the registry so disk-only sessions (e.g. loaded via loadMore) just work transparently.

Thread safety: all public methods are protected by a Mutex.

Constant Summary collapse

SESSION_TIMEOUT =

24 hours of inactivity before cleanup

24 * 60 * 60

Instance Method Summary collapse

Constructor Details

#initialize(session_manager: nil, session_restorer: nil) ⇒ SessionRegistry

session_manager: Clacky::SessionManager instance session_restorer: callable(session_data) → session_id — builds agent + wires into registry



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/clacky/server/session_registry.rb', line 23

def initialize(session_manager: nil, session_restorer: nil)
  @sessions         = {}
  @mutex            = Mutex.new
  @session_manager  = session_manager
  @session_restorer = session_restorer
  # Tracks sessions currently being restored from disk.
  # Other threads calling ensure() for the same id will wait via @restore_cond
  # instead of seeing a half-built session (agent=nil).
  @restoring        = {}
  @restore_cond     = ConditionVariable.new
end

Instance Method Details

#cleanup_stale!Object

Remove sessions idle longer than SESSION_TIMEOUT.



248
249
250
251
252
253
254
255
# File 'lib/clacky/server/session_registry.rb', line 248

def cleanup_stale!
  cutoff = Time.now - SESSION_TIMEOUT
  @mutex.synchronize do
    @sessions.delete_if do |_id, session|
      session[:status] == :idle && session[:updated_at] < cutoff
    end
  end
end

#create(session_id:) ⇒ Object

Create a new (empty) session entry and return its id. agent/ui/thread are set later via with_session once they are constructed.

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/clacky/server/session_registry.rb', line 37

def create(session_id:)
  raise ArgumentError, "session_id is required" if session_id.nil? || session_id.empty?

  session = {
    id:                   session_id,
    status:               :idle,
    error:                nil,
    updated_at:           Time.now,
    agent:                nil,
    ui:                   nil,
    thread:               nil,
    idle_timer:           nil,
    pending_task:         nil,
    pending_working_dir:  nil
  }

  @mutex.synchronize { @sessions[session_id] = session }
  session_id
end

#delete(session_id) ⇒ Object

Delete a session from registry (and interrupt its thread).



222
223
224
225
226
227
228
229
230
231
# File 'lib/clacky/server/session_registry.rb', line 222

def delete(session_id)
  @mutex.synchronize do
    session = @sessions.delete(session_id)
    return false unless session

    session[:idle_timer]&.cancel
    session[:thread]&.raise(Clacky::AgentInterrupted, "Session deleted")
    true
  end
end

#ensure(session_id) ⇒ Object

Ensure a session is in the registry, loading from disk if necessary. Returns true if the session is now available, false if not found anywhere.

Thread-safe: if two threads race on the same session_id, the second one waits for the first to finish restoring (including agent construction) rather than seeing a half-built entry with agent=nil.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/clacky/server/session_registry.rb', line 63

def ensure(session_id)
  session_data = nil

  @mutex.synchronize do
    # Another thread is currently restoring this session (including the case where
    # @registry.create was already called but with_session agent-set is not yet done) —
    # wait for it to finish so callers never see agent=nil.
    if @restoring[session_id]
      @restore_cond.wait(@mutex) until !@restoring[session_id]
      return @sessions.key?(session_id)
    end

    # Already fully ready (not being restored) — fast path.
    return true if @sessions.key?(session_id)

    return false unless @session_manager && @session_restorer

    session_data = @session_manager.load(session_id)
    return false unless session_data

    # Mark as "restore in progress" so concurrent callers wait.
    @restoring[session_id] = true
  end

  # Run the (potentially slow) restore outside the mutex so other sessions
  # are not blocked during agent construction.
  begin
    @session_restorer.call(session_data)
  ensure
    @mutex.synchronize do
      @restoring.delete(session_id)
      @restore_cond.broadcast
    end
  end

  @sessions.key?(session_id)
end

#exist?(session_id) ⇒ Boolean

True if the session exists in registry (runtime).

Returns:

  • (Boolean)


234
235
236
# File 'lib/clacky/server/session_registry.rb', line 234

def exist?(session_id)
  @mutex.synchronize { @sessions.key?(session_id) }
end

#get(session_id) ⇒ Object

Retrieve a session hash by id (returns nil if not found).



122
123
124
# File 'lib/clacky/server/session_registry.rb', line 122

def get(session_id)
  @mutex.synchronize { @sessions[session_id]&.dup }
end

#list(limit: nil, before: nil, q: nil, date: nil, type: nil) ⇒ Object

Return a session list from disk enriched with live registry status. Sorted by created_at descending (newest first).

Parameters (all optional, independent):

source:  "manual"|"cron"|"channel"|"setup"|nil
         nil = no source filter (all sessions)
profile: "general"|"coding"|nil
         nil = no agent_profile filter
limit:   max sessions to return
before:  ISO8601 cursor — only sessions with created_at < before

source and profile are orthogonal — either can be nil independently.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/clacky/server/session_registry.rb', line 150

def list(limit: nil, before: nil, q: nil, date: nil, type: nil)
  return [] unless @session_manager

  live = @mutex.synchronize do
    @sessions.transform_values do |s|
      model_info = s[:agent]&.current_model_info
      live_name  = s[:agent]&.name
      live_name  = nil if live_name&.empty?
      { status: s[:status], error: s[:error], model: model_info&.dig(:model), name: live_name,
        total_tasks: s[:agent]&.total_tasks, total_cost: s[:agent]&.total_cost }
    end
  end

  all = @session_manager.all_sessions  # already sorted newest-first

  # ── type filter (replaces old source/profile split) ──────────────────
  # type=coding  → agent_profile == "coding"
  # type=manual/cron/channel/setup → source match (profile=general implied)
  if type
    if type == "coding"
      all = all.select { |s| (s[:agent_profile] || "general").to_s == "coding" }
    else
      all = all.select { |s| s_source(s) == type && (s[:agent_profile] || "general").to_s != "coding" }
    end
  end

  # ── date filter (YYYY-MM-DD, matches created_at prefix) ──────────────
  all = all.select { |s| s[:created_at].to_s.start_with?(date) } if date

  # ── name / id search ─────────────────────────────────────────────────
  if q && !q.empty?
    q_down = q.downcase
    all = all.select { |s|
      (s[:name] || "").downcase.include?(q_down) ||
        (s[:session_id] || "").downcase.include?(q_down)
    }
  end

  all = all.select { |s| (s[:created_at] || "") < before } if before
  all = all.first(limit) if limit

  all.map do |s|
    id = s[:session_id]
    ls = live[id]
    {
      id:            id,
      name:          ls&.dig(:name) || s[:name] || "",
      status:        ls ? ls[:status].to_s : "idle",
      error:         ls ? ls[:error] : nil,
      model:         ls&.dig(:model),
      source:        s_source(s),
      agent_profile: (s[:agent_profile] || "general").to_s,
      working_dir:   s[:working_dir],
      created_at:    s[:created_at],
      updated_at:    s[:updated_at],
      total_tasks:   ls&.dig(:total_tasks) || s.dig(:stats, :total_tasks) || 0,
      total_cost:    ls&.dig(:total_cost)  || s.dig(:stats, :total_cost_usd) || 0.0,
    }
  end
end

#restore_from_disk(n: 5) ⇒ Object

Restore all sessions from disk (up to n per source type) into the registry. Used at startup. Already-registered sessions are skipped.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/clacky/server/session_registry.rb', line 103

def restore_from_disk(n: 5)
  return unless @session_manager && @session_restorer

  all = @session_manager.all_sessions
    .sort_by { |s| s[:created_at] || "" }
    .reverse

  # Take up to n per source type
  counts = Hash.new(0)
  all.each do |session_data|
    src = (session_data[:source] || "manual").to_s
    next if counts[src] >= n
    next if exist?(session_data[:session_id])
    @session_restorer.call(session_data)
    counts[src] += 1
  end
end

#s_source(s) ⇒ Object

Normalize source field from a disk session hash. “system” is a legacy value renamed to “setup” — treat them as equivalent.



214
215
216
217
# File 'lib/clacky/server/session_registry.rb', line 214

def s_source(s)
  src = (s[:source] || "manual").to_s
  src == "system" ? "setup" : src
end

#session_summary(session_id) ⇒ Object

Build a summary hash for API responses (for in-registry sessions). Used when we need live agent fields (name, cost, etc.) after ensure().



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/clacky/server/session_registry.rb', line 259

def session_summary(session_id)
  session = @mutex.synchronize { @sessions[session_id] }
  return nil unless session
  agent = session[:agent]
  return nil unless agent

  model_info = agent.current_model_info
  {
    id:              session[:id],
    name:            agent.name,
    working_dir:     agent.working_dir,
    status:          session[:status],
    created_at:      agent.created_at,
    updated_at:      session[:updated_at].iso8601,
    total_tasks:     agent.total_tasks || 0,
    total_cost:      agent.total_cost  || 0.0,
    error:           session[:error],
    model:           model_info&.dig(:model),
    permission_mode: agent.permission_mode,
    source:          agent.source.to_s,
    agent_profile:   agent.agent_profile.name,
  }
end

#update(session_id, **fields) ⇒ Object

Update arbitrary runtime fields of a session (status, error, pending_*, etc.).



127
128
129
130
131
132
133
134
135
136
# File 'lib/clacky/server/session_registry.rb', line 127

def update(session_id, **fields)
  @mutex.synchronize do
    session = @sessions[session_id]
    return false unless session

    fields[:updated_at] = Time.now
    session.merge!(fields)
    true
  end
end

#with_session(session_id) ⇒ Object

Execute a block with exclusive access to the raw session hash.



239
240
241
242
243
244
245
# File 'lib/clacky/server/session_registry.rb', line 239

def with_session(session_id)
  @mutex.synchronize do
    session = @sessions[session_id]
    return nil unless session
    yield session
  end
end