Class: Clacky::Server::SessionRegistry
- Inherits:
-
Object
- Object
- Clacky::Server::SessionRegistry
- Defined in:
- lib/clacky/server/session_registry.rb
Overview
SessionRegistry is the single authoritative source for session state.
It owns two concerns:
1. Runtime state — agent instance, thread, status, pending_task, idle_timer.
2. Session list — reads from disk (via session_manager) and enriches with
live runtime status. `list` is the only place the session
list is assembled; no callers should build it elsewhere.
Lazy restore: ‘ensure(session_id)` loads a disk session into the registry on demand. All session-specific APIs call this before touching the registry so disk-only sessions (e.g. loaded via loadMore) just work transparently.
Thread safety: all public methods are protected by a Mutex.
Constant Summary collapse
- SESSION_TIMEOUT =
24 hours of inactivity before cleanup
24 * 60 * 60
Instance Method Summary collapse
-
#cleanup_stale! ⇒ Object
Remove sessions idle longer than SESSION_TIMEOUT.
-
#create(session_id:) ⇒ Object
Create a new (empty) session entry and return its id.
-
#delete(session_id) ⇒ Object
Delete a session from registry (and interrupt its thread).
-
#ensure(session_id) ⇒ Object
Ensure a session is in the registry, loading from disk if necessary.
-
#exist?(session_id) ⇒ Boolean
True if the session exists in registry (runtime).
-
#get(session_id) ⇒ Object
Retrieve a session hash by id (returns nil if not found).
-
#initialize(session_manager: nil, session_restorer: nil) ⇒ SessionRegistry
constructor
session_manager: Clacky::SessionManager instance session_restorer: callable(session_data) → session_id — builds agent + wires into registry.
-
#list(limit: nil, before: nil, q: nil, date: nil, type: nil) ⇒ Object
Return a session list from disk enriched with live registry status.
-
#restore_from_disk(n: 5) ⇒ Object
Restore all sessions from disk (up to n per source type) into the registry.
-
#s_source(s) ⇒ Object
Normalize source field from a disk session hash.
-
#session_summary(session_id) ⇒ Object
Build a summary hash for API responses (for in-registry sessions).
-
#update(session_id, **fields) ⇒ Object
Update arbitrary runtime fields of a session (status, error, pending_*, etc.).
-
#with_session(session_id) ⇒ Object
Execute a block with exclusive access to the raw session hash.
Constructor Details
#initialize(session_manager: nil, session_restorer: nil) ⇒ SessionRegistry
session_manager: Clacky::SessionManager instance session_restorer: callable(session_data) → session_id — builds agent + wires into registry
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/clacky/server/session_registry.rb', line 23 def initialize(session_manager: nil, session_restorer: nil) @sessions = {} @mutex = Mutex.new @session_manager = session_manager @session_restorer = session_restorer # Tracks sessions currently being restored from disk. # Other threads calling ensure() for the same id will wait via @restore_cond # instead of seeing a half-built session (agent=nil). @restoring = {} @restore_cond = ConditionVariable.new end |
Instance Method Details
#cleanup_stale! ⇒ Object
Remove sessions idle longer than SESSION_TIMEOUT.
248 249 250 251 252 253 254 255 |
# File 'lib/clacky/server/session_registry.rb', line 248 def cleanup_stale! cutoff = Time.now - SESSION_TIMEOUT @mutex.synchronize do @sessions.delete_if do |_id, session| session[:status] == :idle && session[:updated_at] < cutoff end end end |
#create(session_id:) ⇒ Object
Create a new (empty) session entry and return its id. agent/ui/thread are set later via with_session once they are constructed.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/clacky/server/session_registry.rb', line 37 def create(session_id:) raise ArgumentError, "session_id is required" if session_id.nil? || session_id.empty? session = { id: session_id, status: :idle, error: nil, updated_at: Time.now, agent: nil, ui: nil, thread: nil, idle_timer: nil, pending_task: nil, pending_working_dir: nil } @mutex.synchronize { @sessions[session_id] = session } session_id end |
#delete(session_id) ⇒ Object
Delete a session from registry (and interrupt its thread).
222 223 224 225 226 227 228 229 230 231 |
# File 'lib/clacky/server/session_registry.rb', line 222 def delete(session_id) @mutex.synchronize do session = @sessions.delete(session_id) return false unless session session[:idle_timer]&.cancel session[:thread]&.raise(Clacky::AgentInterrupted, "Session deleted") true end end |
#ensure(session_id) ⇒ Object
Ensure a session is in the registry, loading from disk if necessary. Returns true if the session is now available, false if not found anywhere.
Thread-safe: if two threads race on the same session_id, the second one waits for the first to finish restoring (including agent construction) rather than seeing a half-built entry with agent=nil.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/clacky/server/session_registry.rb', line 63 def ensure(session_id) session_data = nil @mutex.synchronize do # Another thread is currently restoring this session (including the case where # @registry.create was already called but with_session agent-set is not yet done) — # wait for it to finish so callers never see agent=nil. if @restoring[session_id] @restore_cond.wait(@mutex) until !@restoring[session_id] return @sessions.key?(session_id) end # Already fully ready (not being restored) — fast path. return true if @sessions.key?(session_id) return false unless @session_manager && @session_restorer session_data = @session_manager.load(session_id) return false unless session_data # Mark as "restore in progress" so concurrent callers wait. @restoring[session_id] = true end # Run the (potentially slow) restore outside the mutex so other sessions # are not blocked during agent construction. begin @session_restorer.call(session_data) ensure @mutex.synchronize do @restoring.delete(session_id) @restore_cond.broadcast end end @sessions.key?(session_id) end |
#exist?(session_id) ⇒ Boolean
True if the session exists in registry (runtime).
234 235 236 |
# File 'lib/clacky/server/session_registry.rb', line 234 def exist?(session_id) @mutex.synchronize { @sessions.key?(session_id) } end |
#get(session_id) ⇒ Object
Retrieve a session hash by id (returns nil if not found).
122 123 124 |
# File 'lib/clacky/server/session_registry.rb', line 122 def get(session_id) @mutex.synchronize { @sessions[session_id]&.dup } end |
#list(limit: nil, before: nil, q: nil, date: nil, type: nil) ⇒ Object
Return a session list from disk enriched with live registry status. Sorted by created_at descending (newest first).
Parameters (all optional, independent):
source: "manual"|"cron"|"channel"|"setup"|nil
nil = no source filter (all sessions)
profile: "general"|"coding"|nil
nil = no agent_profile filter
limit: max sessions to return
before: ISO8601 cursor — only sessions with created_at < before
source and profile are orthogonal — either can be nil independently.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/clacky/server/session_registry.rb', line 150 def list(limit: nil, before: nil, q: nil, date: nil, type: nil) return [] unless @session_manager live = @mutex.synchronize do @sessions.transform_values do |s| model_info = s[:agent]&.current_model_info live_name = s[:agent]&.name live_name = nil if live_name&.empty? { status: s[:status], error: s[:error], model: model_info&.dig(:model), name: live_name, total_tasks: s[:agent]&.total_tasks, total_cost: s[:agent]&.total_cost } end end all = @session_manager.all_sessions # already sorted newest-first # ── type filter (replaces old source/profile split) ────────────────── # type=coding → agent_profile == "coding" # type=manual/cron/channel/setup → source match (profile=general implied) if type if type == "coding" all = all.select { |s| (s[:agent_profile] || "general").to_s == "coding" } else all = all.select { |s| s_source(s) == type && (s[:agent_profile] || "general").to_s != "coding" } end end # ── date filter (YYYY-MM-DD, matches created_at prefix) ────────────── all = all.select { |s| s[:created_at].to_s.start_with?(date) } if date # ── name / id search ───────────────────────────────────────────────── if q && !q.empty? q_down = q.downcase all = all.select { |s| (s[:name] || "").downcase.include?(q_down) || (s[:session_id] || "").downcase.include?(q_down) } end all = all.select { |s| (s[:created_at] || "") < before } if before all = all.first(limit) if limit all.map do |s| id = s[:session_id] ls = live[id] { id: id, name: ls&.dig(:name) || s[:name] || "", status: ls ? ls[:status].to_s : "idle", error: ls ? ls[:error] : nil, model: ls&.dig(:model), source: s_source(s), agent_profile: (s[:agent_profile] || "general").to_s, working_dir: s[:working_dir], created_at: s[:created_at], updated_at: s[:updated_at], total_tasks: ls&.dig(:total_tasks) || s.dig(:stats, :total_tasks) || 0, total_cost: ls&.dig(:total_cost) || s.dig(:stats, :total_cost_usd) || 0.0, } end end |
#restore_from_disk(n: 5) ⇒ Object
Restore all sessions from disk (up to n per source type) into the registry. Used at startup. Already-registered sessions are skipped.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/clacky/server/session_registry.rb', line 103 def restore_from_disk(n: 5) return unless @session_manager && @session_restorer all = @session_manager.all_sessions .sort_by { |s| s[:created_at] || "" } .reverse # Take up to n per source type counts = Hash.new(0) all.each do |session_data| src = (session_data[:source] || "manual").to_s next if counts[src] >= n next if exist?(session_data[:session_id]) @session_restorer.call(session_data) counts[src] += 1 end end |
#s_source(s) ⇒ Object
Normalize source field from a disk session hash. “system” is a legacy value renamed to “setup” — treat them as equivalent.
214 215 216 217 |
# File 'lib/clacky/server/session_registry.rb', line 214 def s_source(s) src = (s[:source] || "manual").to_s src == "system" ? "setup" : src end |
#session_summary(session_id) ⇒ Object
Build a summary hash for API responses (for in-registry sessions). Used when we need live agent fields (name, cost, etc.) after ensure().
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/clacky/server/session_registry.rb', line 259 def session_summary(session_id) session = @mutex.synchronize { @sessions[session_id] } return nil unless session agent = session[:agent] return nil unless agent model_info = agent.current_model_info { id: session[:id], name: agent.name, working_dir: agent.working_dir, status: session[:status], created_at: agent.created_at, updated_at: session[:updated_at].iso8601, total_tasks: agent.total_tasks || 0, total_cost: agent.total_cost || 0.0, error: session[:error], model: model_info&.dig(:model), permission_mode: agent., source: agent.source.to_s, agent_profile: agent.agent_profile.name, } end |
#update(session_id, **fields) ⇒ Object
Update arbitrary runtime fields of a session (status, error, pending_*, etc.).
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/clacky/server/session_registry.rb', line 127 def update(session_id, **fields) @mutex.synchronize do session = @sessions[session_id] return false unless session fields[:updated_at] = Time.now session.merge!(fields) true end end |
#with_session(session_id) ⇒ Object
Execute a block with exclusive access to the raw session hash.
239 240 241 242 243 244 245 |
# File 'lib/clacky/server/session_registry.rb', line 239 def with_session(session_id) @mutex.synchronize do session = @sessions[session_id] return nil unless session yield session end end |