Class: Kward::RPC::SessionManager
- Inherits:
-
Object
- Object
- Kward::RPC::SessionManager
- Defined in:
- lib/kward/rpc/session_manager.rb
Overview
Owns RPC-visible session lifecycle, async turn queues, and frontend events.
Server handles JSON-RPC framing/dispatch; SessionManager handles the
product state behind those methods. It creates/resumes SessionStore
sessions, builds agents with RPC prompt bridges, serializes turn events for
clients, coordinates cancellation and follow-up queues, and integrates
memory/plugin hooks for RPC sessions.
Keep JSON-RPC wire shape normalization in the RPC::*Normalizer classes,
persistence in SessionStore, and model/tool behavior in Agent and
ToolRegistry. This class should coordinate those pieces rather than own
their low-level mechanics.
Defined Under Namespace
Classes: RpcSession, Turn
Constant Summary collapse
- RECENT_EVENT_LIMIT =
1_000- RPC_ATTACHMENT_MAX_BYTES =
AttachmentNormalizer::MAX_BYTES
- RPC_IMAGE_MIME_TYPES =
AttachmentNormalizer::IMAGE_MIME_TYPES
- STREAMING_BEHAVIORS =
["newTurn", "followUp", "steer"].freeze
- FOOTER_REFRESH_INTERVAL =
1.0- WORKER_STOP =
Object.new.freeze
Instance Method Summary collapse
- #answer_question(session_id:, question_request_id:, answers:) ⇒ Object
- #available_models ⇒ Object
- #cancel_turn(turn_id:) ⇒ Object
-
#cleanup_unused_sessions ⇒ Object
Closes idle empty sessions left behind by UI lifecycle transitions.
-
#clone_session(session_id:) ⇒ Object
Creates an independent copy of the current conversation branch.
-
#close_session(session_id:) ⇒ Object
Stops workers and removes an RPC session from the live session map.
-
#compact_session(session_id:, custom_instructions: "") ⇒ Object
Compacts an RPC session and emits start/end events for UI progress.
-
#create_session(workspace_root: Dir.pwd, name: nil, resume_last: false) ⇒ Object
Creates a new RPC session or resumes the remembered session when allowed.
- #current_model ⇒ Object
-
#delete_session(session_id:) ⇒ Object
Deletes the backing session file through the configured trash strategy.
-
#export_session(session_id:, path: nil, format: nil) ⇒ Object
Exports the current transcript in markdown or JSON format.
-
#fork_messages(session_id:) ⇒ Object
Lists user-message entries that can be used as fork points.
-
#fork_session(session_id:, entry_id:) ⇒ Object
Creates a new session from history before the selected user message.
- #in_flight_steer_supported? ⇒ Boolean
-
#initialize(server:, client: Client.new, config_dir: ConfigFiles.config_dir, config_manager: ConfigManager.new(config_path: File.join(config_dir, "config.json")), context_usage: ContextUsage.new, session_trash: SessionTrash.new) ⇒ SessionManager
constructor
Creates an object for RPC session lifecycle and turn coordination.
- #list_sessions(workspace_root: Dir.pwd, limit: nil, current_session_path: nil) ⇒ Object
- #memory_add(text:, scope: nil, tags: []) ⇒ Object
- #memory_add_core(text:, scope: nil, tags: []) ⇒ Object
- #memory_auto_summary_disable ⇒ Object
- #memory_auto_summary_enable ⇒ Object
- #memory_disable ⇒ Object
- #memory_enable ⇒ Object
- #memory_forget(id:) ⇒ Object
- #memory_inspect ⇒ Object
- #memory_list(include_inactive: false, workspace_root: Dir.pwd) ⇒ Object
- #memory_manager ⇒ Object
- #memory_promote(id:) ⇒ Object
- #memory_relax(id:, workspace_root: Dir.pwd) ⇒ Object
- #memory_status ⇒ Object
- #memory_summarize(session_id:) ⇒ Object
- #memory_why(session_id: nil) ⇒ Object
-
#navigate_tree(session_id:, entry_id:, summarize: false, custom_instructions: nil) ⇒ Object
Moves the active branch to a tree entry, optionally summarizing abandoned history.
- #openrouter_catalog ⇒ Object
- #plugin_commands ⇒ Object
- #refresh_client_config ⇒ Object
- #reload_plugins ⇒ Object
-
#rename_session(session_id:, name:) ⇒ Object
Renames the persisted session attached to an RPC session id.
- #resume_session(path:, workspace_root: nil, include_transcript: false) ⇒ Object
- #run_command(session_id:, command:, arguments: "") ⇒ Object
- #run_plugin_command(session_id:, command:, arguments: "") ⇒ Object
- #runtime_state(session_id:) ⇒ Object
- #runtime_stats(session_id:) ⇒ Object
- #session_model(rpc_session) ⇒ Object
- #session_modified_at(session) ⇒ Object
- #session_payload(rpc_session) ⇒ Object
-
#session_tree(session_id:) ⇒ Object
Returns the flattened session tree rows consumed by RPC clients.
-
#set_tree_label(session_id:, entry_id:, label: nil) ⇒ Object
Persists a label override for one tree entry.
-
#start_turn(session_id:, input:, streaming_behavior: nil, attachments: []) ⇒ Object
Queues or starts an async model turn for an RPC session.
-
#transcript(session_id:) ⇒ Object
Returns the normalized transcript for the active RPC session.
- #turn_events(turn_id:, after_sequence: 0) ⇒ Object
- #turn_status(turn_id:) ⇒ Object
- #validate_workspace_root(root) ⇒ Object
Constructor Details
#initialize(server:, client: Client.new, config_dir: ConfigFiles.config_dir, config_manager: ConfigManager.new(config_path: File.join(config_dir, "config.json")), context_usage: ContextUsage.new, session_trash: SessionTrash.new) ⇒ SessionManager
Creates an object for RPC session lifecycle and turn coordination.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/kward/rpc/session_manager.rb', line 65 def initialize( server:, client: Client.new, config_dir: ConfigFiles.config_dir, config_manager: ConfigManager.new(config_path: File.join(config_dir, "config.json")), context_usage: ContextUsage.new, session_trash: SessionTrash.new ) @server = server @client = client @config_dir = config_dir @config_manager = config_manager @context_usage = context_usage @session_metrics = SessionMetrics.new(context_usage: context_usage) @session_trash = session_trash @sessions = {} @turns = {} @mutex = Mutex.new end |
Instance Method Details
#answer_question(session_id:, question_request_id:, answers:) ⇒ Object
375 376 377 378 379 |
# File 'lib/kward/rpc/session_manager.rb', line 375 def answer_question(session_id:, question_request_id:, answers:) rpc_session = fetch_session(session_id) rpc_session.prompt.answer(question_request_id, answers) { ok: true } end |
#available_models ⇒ Object
480 481 482 483 484 485 486 |
# File 'lib/kward/rpc/session_manager.rb', line 480 def available_models models = @client.respond_to?(:available_models) ? Array(@client.available_models) : [] normalized = models.map { |model| normalize_model(model) } current = current_model normalized << current if normalized.none? { |model| model[:provider] == current[:provider] && model[:id] == current[:id] } normalized end |
#cancel_turn(turn_id:) ⇒ Object
351 352 353 354 355 356 357 358 359 360 |
# File 'lib/kward/rpc/session_manager.rb', line 351 def cancel_turn(turn_id:) turn = fetch_turn(turn_id) turn.cancel_requested = true turn.cancellation&.cancel! emit_turn_event(turn, "turnCancelRequested", {}) if turn.status == "queued" finish_turn(turn, "canceled") end turn_payload(turn) end |
#cleanup_unused_sessions ⇒ Object
Closes idle empty sessions left behind by UI lifecycle transitions.
294 295 296 297 298 299 300 301 302 |
# File 'lib/kward/rpc/session_manager.rb', line 294 def cleanup_unused_sessions rpc_sessions = @mutex.synchronize { @sessions.values.dup } rpc_sessions.reverse_each do |rpc_session| next unless session_idle?(rpc_session) close_rpc_session(rpc_session) end { closed: true } end |
#clone_session(session_id:) ⇒ Object
Creates an independent copy of the current conversation branch.
149 150 151 152 153 154 155 156 157 |
# File 'lib/kward/rpc/session_manager.rb', line 149 def clone_session(session_id:) source = fetch_session(session_id) session, conversation = source.store.create_independent_from_conversation(source.conversation, parent_session: source.session) rpc_session = build_rpc_session(source.store, session, conversation, source.workspace_root) remember_session(rpc_session) cleanup_other_unused_sessions(rpc_session) (rpc_session) session_payload(rpc_session) end |
#close_session(session_id:) ⇒ Object
Stops workers and removes an RPC session from the live session map.
287 288 289 290 291 |
# File 'lib/kward/rpc/session_manager.rb', line 287 def close_session(session_id:) rpc_session = fetch_session(session_id) close_rpc_session(rpc_session) { closed: true } end |
#compact_session(session_id:, custom_instructions: "") ⇒ Object
Compacts an RPC session and emits start/end events for UI progress.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/kward/rpc/session_manager.rb', line 160 def compact_session(session_id:, custom_instructions: "") rpc_session = fetch_session(session_id) emit_session_event(rpc_session, "compactionStart", {}) result = Compactor.new(conversation: rpc_session.conversation, client: @client, settings: compaction_settings).compact(custom_instructions: custom_instructions) payload = { summary: result.summary, firstKeptEntryId: result.first_kept_entry_id, tokensBefore: result.tokens_before, details: result.details }.compact emit_session_event(rpc_session, "compactionEnd", { result: payload, aborted: false, willRetry: false, errorMessage: nil }) payload rescue StandardError => e emit_session_event(rpc_session, "compactionEnd", { result: nil, aborted: true, willRetry: false, errorMessage: e. }) if rpc_session raise e end |
#create_session(workspace_root: Dir.pwd, name: nil, resume_last: false) ⇒ Object
Creates a new RPC session or resumes the remembered session when allowed.
Returns the normalized session payload expected by RPC clients. The RPC session id is separate from the persisted session id so one persisted file can be closed and reopened by different client connections.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/kward/rpc/session_manager.rb', line 90 def create_session(workspace_root: Dir.pwd, name: nil, resume_last: false) workspace_root = validate_workspace_root(workspace_root) store = SessionStore.new(config_dir: @config_dir, cwd: workspace_root) if resume_last && session_auto_resume_enabled? && name.to_s.strip.empty? path = store.remembered_last_session_path return resume_session(path: path, workspace_root: workspace_root, include_transcript: true) if path end conversation = new_conversation(workspace_root: workspace_root) session = store.create(provider: conversation.provider, model: conversation.model, reasoning_effort: conversation.reasoning_effort) session.rename(name) unless name.to_s.strip.empty? session.attach(conversation) rpc_session = build_rpc_session(store, session, conversation, workspace_root) remember_session(rpc_session) cleanup_other_unused_sessions(rpc_session) (rpc_session) session_payload(rpc_session) end |
#current_model ⇒ Object
493 494 495 496 497 498 |
# File 'lib/kward/rpc/session_manager.rb', line 493 def current_model provider = @client.respond_to?(:current_provider) ? @client.current_provider : nil model = @client.respond_to?(:current_model) ? @client.current_model : nil context_window = @client.respond_to?(:current_context_window) ? @client.current_context_window : nil normalize_model(provider: provider, id: model, model: model, contextWindow: context_window, current: true) end |
#delete_session(session_id:) ⇒ Object
Deletes the backing session file through the configured trash strategy.
278 279 280 281 282 283 284 |
# File 'lib/kward/rpc/session_manager.rb', line 278 def delete_session(session_id:) rpc_session = fetch_session(session_id) path = rpc_session.session.path close_rpc_session(rpc_session, delete_unused: false) deleted = @session_trash.delete(path) { deleted: deleted, path: path } end |
#export_session(session_id:, path: nil, format: nil) ⇒ Object
Exports the current transcript in markdown or JSON format.
268 269 270 271 272 273 274 275 |
# File 'lib/kward/rpc/session_manager.rb', line 268 def export_session(session_id:, path: nil, format: nil) rpc_session = fetch_session(session_id) format = export_format(format) path = export_path(rpc_session, path, format) content = export_content(rpc_session.conversation, format) File.write(path, content) { path: path, format: format } end |
#fork_messages(session_id:) ⇒ Object
Lists user-message entries that can be used as fork points.
178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/kward/rpc/session_manager.rb', line 178 def (session_id:) rpc_session = fetch_session(session_id) { messages: session_tree_helper(rpc_session).entries.filter_map do |record| = record["message"] next unless .is_a?(Hash) && () == "user" { entryId: record["id"], text: () } end } end |
#fork_session(session_id:, entry_id:) ⇒ Object
Creates a new session from history before the selected user message.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/kward/rpc/session_manager.rb', line 191 def fork_session(session_id:, entry_id:) source = fetch_session(session_id) tree = session_tree_helper(source) entries = tree.entries resolved_entry_id = tree.resolve_entry_id(entry_id, entries: entries) selected_index = entries.index { |record| record["id"].to_s == resolved_entry_id.to_s } selected = selected_index && entries[selected_index] raise ArgumentError, "Unknown fork entryId: #{entry_id}" unless selected = selected["message"] raise ArgumentError, "Entry is not forkable: #{entry_id}" unless .is_a?(Hash) && () == "user" session, conversation = source.store.( entries[0...selected_index].filter_map { |record| record["message"] }, provider: source.conversation.provider, model: source.conversation.model, reasoning_effort: source.conversation.reasoning_effort, parent_session: source.session ) rpc_session = build_rpc_session(source.store, session, conversation, source.workspace_root) remember_session(rpc_session) cleanup_other_unused_sessions(rpc_session) { session: session_payload(rpc_session), text: (), cancelled: false } end |
#in_flight_steer_supported? ⇒ Boolean
517 518 519 |
# File 'lib/kward/rpc/session_manager.rb', line 517 def in_flight_steer_supported? supports_in_flight_steer? end |
#list_sessions(workspace_root: Dir.pwd, limit: nil, current_session_path: nil) ⇒ Object
132 133 134 135 136 137 138 139 |
# File 'lib/kward/rpc/session_manager.rb', line 132 def list_sessions(workspace_root: Dir.pwd, limit: nil, current_session_path: nil) root = validate_workspace_root(workspace_root) store = SessionStore.new(config_dir: @config_dir, cwd: root) requested_limit = limit.to_i if limit requested_limit = nil unless requested_limit&.positive? store.recent(limit: requested_limit, keep_empty_path: current_session_path) .map { |info| session_info_payload(info, workspace_root: root) } end |
#memory_add(text:, scope: nil, tags: []) ⇒ Object
421 422 423 |
# File 'lib/kward/rpc/session_manager.rb', line 421 def memory_add(text:, scope: nil, tags: []) { memory: memory_manager.add_soft(text, scope: scope || "global", tags: ) } end |
#memory_add_core(text:, scope: nil, tags: []) ⇒ Object
425 426 427 |
# File 'lib/kward/rpc/session_manager.rb', line 425 def memory_add_core(text:, scope: nil, tags: []) { memory: memory_manager.add_core(text, scope: scope || "global", tags: ) } end |
#memory_auto_summary_disable ⇒ Object
412 413 414 415 |
# File 'lib/kward/rpc/session_manager.rb', line 412 def memory_auto_summary_disable memory_manager.auto_summary_disable { autoSummary: false } end |
#memory_auto_summary_enable ⇒ Object
407 408 409 410 |
# File 'lib/kward/rpc/session_manager.rb', line 407 def memory_auto_summary_enable memory_manager.auto_summary_enable { autoSummary: true } end |
#memory_disable ⇒ Object
402 403 404 405 |
# File 'lib/kward/rpc/session_manager.rb', line 402 def memory_disable memory_manager.disable { enabled: false } end |
#memory_enable ⇒ Object
397 398 399 400 |
# File 'lib/kward/rpc/session_manager.rb', line 397 def memory_enable memory_manager.enable { enabled: true } end |
#memory_forget(id:) ⇒ Object
429 430 431 |
# File 'lib/kward/rpc/session_manager.rb', line 429 def memory_forget(id:) { forgotten: memory_manager.forget_memory(id) } end |
#memory_inspect ⇒ Object
441 442 443 |
# File 'lib/kward/rpc/session_manager.rb', line 441 def memory_inspect memory_manager.inspect_memory end |
#memory_list(include_inactive: false, workspace_root: Dir.pwd) ⇒ Object
417 418 419 |
# File 'lib/kward/rpc/session_manager.rb', line 417 def memory_list(include_inactive: false, workspace_root: Dir.pwd) memory_manager.hierarchy(include_inactive: include_inactive, workspace_root: workspace_root) end |
#memory_manager ⇒ Object
388 389 390 |
# File 'lib/kward/rpc/session_manager.rb', line 388 def memory_manager Memory::Manager.for_config_dir(@config_dir) end |
#memory_promote(id:) ⇒ Object
433 434 435 |
# File 'lib/kward/rpc/session_manager.rb', line 433 def memory_promote(id:) { memory: memory_manager.promote_memory(id) } end |
#memory_relax(id:, workspace_root: Dir.pwd) ⇒ Object
437 438 439 |
# File 'lib/kward/rpc/session_manager.rb', line 437 def memory_relax(id:, workspace_root: Dir.pwd) { memory: memory_manager.relax_core(id, workspace_root: workspace_root) } end |
#memory_status ⇒ Object
392 393 394 395 |
# File 'lib/kward/rpc/session_manager.rb', line 392 def memory_status manager = memory_manager { enabled: manager.enabled?, autoSummary: manager.auto_summary_enabled?, paths: manager.paths } end |
#memory_summarize(session_id:) ⇒ Object
453 454 455 456 457 458 |
# File 'lib/kward/rpc/session_manager.rb', line 453 def memory_summarize(session_id:) rpc_session = fetch_session(session_id) records = memory_manager.summarize_conversation(rpc_session.conversation, client: @client) persist_memory_state(rpc_session) { memories: records } end |
#memory_why(session_id: nil) ⇒ Object
445 446 447 448 449 450 451 |
# File 'lib/kward/rpc/session_manager.rb', line 445 def memory_why(session_id: nil) if session_id rpc_session = fetch_session(session_id) return rpc_session.conversation.last_memory_retrieval || memory_manager.explain_retrieval end memory_manager.explain_retrieval end |
#navigate_tree(session_id:, entry_id:, summarize: false, custom_instructions: nil) ⇒ Object
Moves the active branch to a tree entry, optionally summarizing abandoned history.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/kward/rpc/session_manager.rb', line 235 def navigate_tree(session_id:, entry_id:, summarize: false, custom_instructions: nil) rpc_session = fetch_session(session_id) tree = session_tree_helper(rpc_session) entries = tree.entries resolved_entry_id = tree.resolve_entry_id(entry_id, entries: entries) entry = rpc_session.store.session_entry(rpc_session.session.path, resolved_entry_id) raise ArgumentError, "Unknown tree entryId: #{entry_id}" unless entry raise ArgumentError, "Tree entry is not selectable: #{entry_id}" unless tree.selectable_entry?(entry) = entry["message"] user_entry = tree.user_entry?(entry) target_leaf = user_entry ? entry["parentId"] : entry["id"] editor_text = user_entry ? () : nil previous_leaf = rpc_session.session.leaf_id if summarize summary = summarize_branch(rpc_session, from_id: previous_leaf, to_id: target_leaf, custom_instructions: custom_instructions) target_leaf = rpc_session.session.append_branch_summary(target_leaf, from_id: previous_leaf, summary: summary, details: {}) else target_leaf ? rpc_session.session.branch(target_leaf) : rpc_session.session.reset_leaf end reload_rpc_session(rpc_session) { session: session_payload(rpc_session), editorText: editor_text, cancelled: false, aborted: false }.compact end |
#openrouter_catalog ⇒ Object
488 489 490 491 |
# File 'lib/kward/rpc/session_manager.rb', line 488 def openrouter_catalog models = @client.respond_to?(:openrouter_catalog) ? Array(@client.openrouter_catalog) : [] models.map { |model| normalize_model(model) } end |
#plugin_commands ⇒ Object
476 477 478 |
# File 'lib/kward/rpc/session_manager.rb', line 476 def plugin_commands plugin_registry.commands end |
#refresh_client_config ⇒ Object
564 565 566 567 568 |
# File 'lib/kward/rpc/session_manager.rb', line 564 def refresh_client_config @client.reload_config if @client.respond_to?(:reload_config) refresh_session_runtime_contexts refresh_session_tool_registries end |
#reload_plugins ⇒ Object
570 571 572 573 574 575 576 577 578 579 580 581 |
# File 'lib/kward/rpc/session_manager.rb', line 570 def reload_plugins registry = PluginRegistry.load(reserved_commands: reserved_plugin_command_names) sessions = @mutex.synchronize do @plugin_registry = registry @sessions.values end sessions.each do |rpc_session| rpc_session.conversation.plugin_registry = registry if rpc_session.conversation.respond_to?(:plugin_registry=) rpc_session.conversation. if rpc_session.conversation.respond_to?(:refresh_system_message!) (rpc_session) end end |
#rename_session(session_id:, name:) ⇒ Object
Renames the persisted session attached to an RPC session id.
142 143 144 145 146 |
# File 'lib/kward/rpc/session_manager.rb', line 142 def rename_session(session_id:, name:) rpc_session = fetch_session(session_id) rpc_session.session.rename(name) session_payload(rpc_session) end |
#resume_session(path:, workspace_root: nil, include_transcript: false) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/kward/rpc/session_manager.rb', line 109 def resume_session(path:, workspace_root: nil, include_transcript: false) root = validate_workspace_root(workspace_root || Dir.pwd) store = SessionStore.new(config_dir: @config_dir, cwd: root) location = store.session_location(path) root = validate_workspace_root(location[:cwd]) store = SessionStore.new(config_dir: @config_dir, cwd: root) session, conversation = store.load( location[:path], workspace: configured_workspace(root), provider: current_model[:provider], model: current_model_id, reasoning_effort: current_reasoning_effort ) rpc_session = build_rpc_session(store, session, conversation, root) remember_session(rpc_session) cleanup_other_unused_sessions(rpc_session) (rpc_session) payload = session_payload(rpc_session) payload[:messages] = TranscriptNormalizer.new(rpc_session.conversation.).normalize if include_transcript payload[:resumed] = true payload end |
#run_command(session_id:, command:, arguments: "") ⇒ Object
381 382 383 384 385 386 |
# File 'lib/kward/rpc/session_manager.rb', line 381 def run_command(session_id:, command:, arguments: "") name = command.to_s.delete_prefix("/") return { ok: false, error: "unsupported", reason: "clientClipboardOwnedByUi" } if name == "copy" run_plugin_command(session_id: session_id, command: name, arguments: arguments) end |
#run_plugin_command(session_id:, command:, arguments: "") ⇒ Object
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/kward/rpc/session_manager.rb', line 460 def run_plugin_command(session_id:, command:, arguments: "") rpc_session = fetch_session(session_id) command = plugin_registry.command_for(command.to_s.delete_prefix("/")) || raise(ArgumentError, "Unknown plugin command: #{command}") output = [] context = PluginRegistry::Context.new( conversation: rpc_session.conversation, args: arguments.to_s, session: rpc_session.session, workspace_root: rpc_session.workspace_root, say_callback: lambda { || output << .to_s } ) result = command.handler.call(arguments.to_s, context) output = rpc_session.plugin_output.shift(rpc_session.plugin_output.length) + output { command: command.name, output: output, result: result.nil? ? nil : result.to_s } end |
#runtime_state(session_id:) ⇒ Object
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 |
# File 'lib/kward/rpc/session_manager.rb', line 521 def runtime_state(session_id:) rpc_session = fetch_session(session_id) model = session_model(rpc_session) compaction_settings = self.compaction_settings auto_compaction_reserve_tokens = compaction_reserve_tokens( context_window: model[:contextWindow], compaction_settings: compaction_settings ) session = session_payload(rpc_session) RuntimePayloads.state( session: session, model: model, streaming: streaming?(rpc_session), steering_supported: supports_in_flight_steer?, auto_compaction_reserve_tokens: auto_compaction_reserve_tokens, active_persona_label: active_persona_label(rpc_session), message_count: @session_metrics.(rpc_session.conversation), pending_count: pending_turn_count(rpc_session.id), compaction_enabled: compaction_settings.enabled, workspace_guardrails_enabled: workspace_guardrails_enabled? ) end |
#runtime_stats(session_id:) ⇒ Object
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 |
# File 'lib/kward/rpc/session_manager.rb', line 544 def runtime_stats(session_id:) rpc_session = fetch_session(session_id) session = session_payload(rpc_session) counts = @session_metrics.(rpc_session.conversation) model = session_model(rpc_session) compaction_settings = self.compaction_settings auto_compaction_reserve_tokens = compaction_reserve_tokens( context_window: model[:contextWindow], compaction_settings: compaction_settings ) RuntimePayloads.stats( session: session, counts: counts, model: model, auto_compaction_reserve_tokens: auto_compaction_reserve_tokens, context_usage: @session_metrics.context_usage(rpc_session, model, client: @client), compaction_enabled: compaction_settings.enabled ) end |
#session_model(rpc_session) ⇒ Object
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 |
# File 'lib/kward/rpc/session_manager.rb', line 500 def session_model(rpc_session) current = current_model provider = rpc_session.conversation.provider || current[:provider] model = rpc_session.conversation.model || current[:id] reasoning_effort = rpc_session.conversation.reasoning_effort || current_reasoning_effort reasoning_effort = nil unless ModelInfo.reasoning_supported?(provider, model) context_window = current[:contextWindow] if provider == current[:provider] && model == current[:id] normalize_model( provider: provider, id: model, model: model, reasoningEffort: reasoning_effort, contextWindow: context_window, current: true ) end |
#session_modified_at(session) ⇒ Object
591 592 593 |
# File 'lib/kward/rpc/session_manager.rb', line 591 def session_modified_at(session) File.exist?(session.path) ? File.mtime(session.path) : nil end |
#session_payload(rpc_session) ⇒ Object
583 584 585 586 587 588 589 |
# File 'lib/kward/rpc/session_manager.rb', line 583 def session_payload(rpc_session) RuntimePayloads.session( rpc_session, modified_at: session_modified_at(rpc_session.session), active_persona_label: active_persona_label(rpc_session) ) end |
#session_tree(session_id:) ⇒ Object
Returns the flattened session tree rows consumed by RPC clients.
222 223 224 225 |
# File 'lib/kward/rpc/session_manager.rb', line 222 def session_tree(session_id:) rpc_session = fetch_session(session_id) { items: flatten_session_tree(rpc_session) } end |
#set_tree_label(session_id:, entry_id:, label: nil) ⇒ Object
Persists a label override for one tree entry.
228 229 230 231 232 |
# File 'lib/kward/rpc/session_manager.rb', line 228 def set_tree_label(session_id:, entry_id:, label: nil) rpc_session = fetch_session(session_id) rpc_session.session.append_label_change(entry_id, label) { ok: true } end |
#start_turn(session_id:, input:, streaming_behavior: nil, attachments: []) ⇒ Object
Queues or starts an async model turn for an RPC session.
streaming_behavior controls busy-session behavior: create a new turn,
queue a follow-up, or steer the running turn when the active provider
supports native steering. The returned turn id is used for status,
cancellation, and event replay.
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/kward/rpc/session_manager.rb', line 316 def start_turn(session_id:, input:, streaming_behavior: nil, attachments: []) rpc_session = fetch_session(session_id) = () plugin_command, plugin_arguments = plugin_command_turn(input, ) display_input = input.to_s if input.is_a?(String) content = plugin_command ? input.to_s : user_turn_content((input), ) streaming_behavior = validate_streaming_behavior(default_streaming_behavior(rpc_session, streaming_behavior), rpc_session: rpc_session) if streaming_behavior == "steer" steered_turn = steer_running_turn(rpc_session, content) return steered_turn if steered_turn streaming_behavior = "followUp" end turn = Turn.new( id: SecureRandom.uuid, session_id: rpc_session.id, input: content, display_input: display_input, status: "queued", cancel_requested: false, cancellation: Cancellation.new, created_at: now, events: [], next_sequence: 1, streaming_behavior: streaming_behavior, plugin_command_name: plugin_command&.name, plugin_arguments: plugin_arguments ) @mutex.synchronize { @turns[turn.id] = turn } rpc_session.queue << turn.id ensure_worker(rpc_session) emit_turn_event(turn, "turnQueued", { status: "queued" }) turn_payload(turn) end |
#transcript(session_id:) ⇒ Object
Returns the normalized transcript for the active RPC session.
305 306 307 308 |
# File 'lib/kward/rpc/session_manager.rb', line 305 def transcript(session_id:) rpc_session = fetch_session(session_id) { session: session_payload(rpc_session), messages: TranscriptNormalizer.new(rpc_session.conversation.).normalize } end |
#turn_events(turn_id:, after_sequence: 0) ⇒ Object
366 367 368 369 370 371 372 373 |
# File 'lib/kward/rpc/session_manager.rb', line 366 def turn_events(turn_id:, after_sequence: 0) turn = fetch_turn(turn_id) after_sequence = after_sequence.to_i { turn: turn_payload(turn), events: turn.events.select { |event| event[:sequence].to_i > after_sequence } } end |
#turn_status(turn_id:) ⇒ Object
362 363 364 |
# File 'lib/kward/rpc/session_manager.rb', line 362 def turn_status(turn_id:) turn_payload(fetch_turn(turn_id)) end |