Class: Kward::RPC::SessionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/kward/rpc/session_manager.rb

Overview

Owns RPC-visible session lifecycle, async turn queues, and frontend events.

Server handles JSON-RPC framing/dispatch; SessionManager handles the product state behind those methods. It creates/resumes SessionStore sessions, builds agents with RPC prompt bridges, serializes turn events for clients, coordinates cancellation and follow-up queues, and integrates memory/plugin hooks for RPC sessions.

Keep JSON-RPC wire shape normalization in the RPC::*Normalizer classes, persistence in SessionStore, and model/tool behavior in Agent and ToolRegistry. This class should coordinate those pieces rather than own their low-level mechanics.

Defined Under Namespace

Classes: RpcSession, Turn

Constant Summary collapse

RECENT_EVENT_LIMIT =
1_000
RPC_ATTACHMENT_MAX_BYTES =
AttachmentNormalizer::MAX_BYTES
RPC_IMAGE_MIME_TYPES =
AttachmentNormalizer::IMAGE_MIME_TYPES
STREAMING_BEHAVIORS =
["newTurn", "followUp", "steer"].freeze
1.0
WORKER_STOP =
Object.new.freeze

Instance Method Summary collapse

Constructor Details

#initialize(server:, client: Client.new, config_dir: ConfigFiles.config_dir, config_manager: ConfigManager.new(config_path: File.join(config_dir, "config.json")), context_usage: ContextUsage.new, session_trash: SessionTrash.new) ⇒ SessionManager

Creates an object for RPC session lifecycle and turn coordination.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/kward/rpc/session_manager.rb', line 65

def initialize(
  server:,
  client: Client.new,
  config_dir: ConfigFiles.config_dir,
  config_manager: ConfigManager.new(config_path: File.join(config_dir, "config.json")),
  context_usage: ContextUsage.new,
  session_trash: SessionTrash.new
)
  @server = server
  @client = client
  @config_dir = config_dir
  @config_manager = config_manager
  @context_usage = context_usage
  @session_metrics = SessionMetrics.new(context_usage: context_usage)
  @session_trash = session_trash
  @sessions = {}
  @turns = {}
  @mutex = Mutex.new
end

Instance Method Details

#answer_question(session_id:, question_request_id:, answers:) ⇒ Object



375
376
377
378
379
# File 'lib/kward/rpc/session_manager.rb', line 375

def answer_question(session_id:, question_request_id:, answers:)
  rpc_session = fetch_session(session_id)
  rpc_session.prompt.answer(question_request_id, answers)
  { ok: true }
end

#available_modelsObject



480
481
482
483
484
485
486
# File 'lib/kward/rpc/session_manager.rb', line 480

def available_models
  models = @client.respond_to?(:available_models) ? Array(@client.available_models) : []
  normalized = models.map { |model| normalize_model(model) }
  current = current_model
  normalized << current if normalized.none? { |model| model[:provider] == current[:provider] && model[:id] == current[:id] }
  normalized
end

#cancel_turn(turn_id:) ⇒ Object



351
352
353
354
355
356
357
358
359
360
# File 'lib/kward/rpc/session_manager.rb', line 351

def cancel_turn(turn_id:)
  turn = fetch_turn(turn_id)
  turn.cancel_requested = true
  turn.cancellation&.cancel!
  emit_turn_event(turn, "turnCancelRequested", {})
  if turn.status == "queued"
    finish_turn(turn, "canceled")
  end
  turn_payload(turn)
end

#cleanup_unused_sessionsObject

Closes idle empty sessions left behind by UI lifecycle transitions.



294
295
296
297
298
299
300
301
302
# File 'lib/kward/rpc/session_manager.rb', line 294

def cleanup_unused_sessions
  rpc_sessions = @mutex.synchronize { @sessions.values.dup }
  rpc_sessions.reverse_each do |rpc_session|
    next unless session_idle?(rpc_session)

    close_rpc_session(rpc_session)
  end
  { closed: true }
end

#clone_session(session_id:) ⇒ Object

Creates an independent copy of the current conversation branch.



149
150
151
152
153
154
155
156
157
# File 'lib/kward/rpc/session_manager.rb', line 149

def clone_session(session_id:)
  source = fetch_session(session_id)
  session, conversation = source.store.create_independent_from_conversation(source.conversation, parent_session: source.session)
  rpc_session = build_rpc_session(source.store, session, conversation, source.workspace_root)
  remember_session(rpc_session)
  cleanup_other_unused_sessions(rpc_session)
  emit_footer_update(rpc_session)
  session_payload(rpc_session)
end

#close_session(session_id:) ⇒ Object

Stops workers and removes an RPC session from the live session map.



287
288
289
290
291
# File 'lib/kward/rpc/session_manager.rb', line 287

def close_session(session_id:)
  rpc_session = fetch_session(session_id)
  close_rpc_session(rpc_session)
  { closed: true }
end

#compact_session(session_id:, custom_instructions: "") ⇒ Object

Compacts an RPC session and emits start/end events for UI progress.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/kward/rpc/session_manager.rb', line 160

def compact_session(session_id:, custom_instructions: "")
  rpc_session = fetch_session(session_id)
  emit_session_event(rpc_session, "compactionStart", {})
  result = Compactor.new(conversation: rpc_session.conversation, client: @client, settings: compaction_settings).compact(custom_instructions: custom_instructions)
  payload = {
    summary: result.summary,
    firstKeptEntryId: result.first_kept_entry_id,
    tokensBefore: result.tokens_before,
    details: result.details
  }.compact
  emit_session_event(rpc_session, "compactionEnd", { result: payload, aborted: false, willRetry: false, errorMessage: nil })
  payload
rescue StandardError => e
  emit_session_event(rpc_session, "compactionEnd", { result: nil, aborted: true, willRetry: false, errorMessage: e.message }) if rpc_session
  raise e
end

#create_session(workspace_root: Dir.pwd, name: nil, resume_last: false) ⇒ Object

Creates a new RPC session or resumes the remembered session when allowed.

Returns the normalized session payload expected by RPC clients. The RPC session id is separate from the persisted session id so one persisted file can be closed and reopened by different client connections.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/kward/rpc/session_manager.rb', line 90

def create_session(workspace_root: Dir.pwd, name: nil, resume_last: false)
  workspace_root = validate_workspace_root(workspace_root)
  store = SessionStore.new(config_dir: @config_dir, cwd: workspace_root)
  if resume_last && session_auto_resume_enabled? && name.to_s.strip.empty?
    path = store.remembered_last_session_path
    return resume_session(path: path, workspace_root: workspace_root, include_transcript: true) if path
  end

  conversation = new_conversation(workspace_root: workspace_root)
  session = store.create(provider: conversation.provider, model: conversation.model, reasoning_effort: conversation.reasoning_effort)
  session.rename(name) unless name.to_s.strip.empty?
  session.attach(conversation)
  rpc_session = build_rpc_session(store, session, conversation, workspace_root)
  remember_session(rpc_session)
  cleanup_other_unused_sessions(rpc_session)
  emit_footer_update(rpc_session)
  session_payload(rpc_session)
end

#current_modelObject



493
494
495
496
497
498
# File 'lib/kward/rpc/session_manager.rb', line 493

def current_model
  provider = @client.respond_to?(:current_provider) ? @client.current_provider : nil
  model = @client.respond_to?(:current_model) ? @client.current_model : nil
  context_window = @client.respond_to?(:current_context_window) ? @client.current_context_window : nil
  normalize_model(provider: provider, id: model, model: model, contextWindow: context_window, current: true)
end

#delete_session(session_id:) ⇒ Object

Deletes the backing session file through the configured trash strategy.



278
279
280
281
282
283
284
# File 'lib/kward/rpc/session_manager.rb', line 278

def delete_session(session_id:)
  rpc_session = fetch_session(session_id)
  path = rpc_session.session.path
  close_rpc_session(rpc_session, delete_unused: false)
  deleted = @session_trash.delete(path)
  { deleted: deleted, path: path }
end

#export_session(session_id:, path: nil, format: nil) ⇒ Object

Exports the current transcript in markdown or JSON format.



268
269
270
271
272
273
274
275
# File 'lib/kward/rpc/session_manager.rb', line 268

def export_session(session_id:, path: nil, format: nil)
  rpc_session = fetch_session(session_id)
  format = export_format(format)
  path = export_path(rpc_session, path, format)
  content = export_content(rpc_session.conversation, format)
  File.write(path, content)
  { path: path, format: format }
end

#fork_messages(session_id:) ⇒ Object

Lists user-message entries that can be used as fork points.



178
179
180
181
182
183
184
185
186
187
188
# File 'lib/kward/rpc/session_manager.rb', line 178

def fork_messages(session_id:)
  rpc_session = fetch_session(session_id)
  {
    messages: session_tree_helper(rpc_session).entries.filter_map do |record|
      message = record["message"]
      next unless message.is_a?(Hash) && message_role(message) == "user"

      { entryId: record["id"], text: display_message_text(message) }
    end
  }
end

#fork_session(session_id:, entry_id:) ⇒ Object

Creates a new session from history before the selected user message.

Raises:

  • (ArgumentError)


191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/kward/rpc/session_manager.rb', line 191

def fork_session(session_id:, entry_id:)
  source = fetch_session(session_id)
  tree = session_tree_helper(source)
  entries = tree.entries
  resolved_entry_id = tree.resolve_entry_id(entry_id, entries: entries)
  selected_index = entries.index { |record| record["id"].to_s == resolved_entry_id.to_s }
  selected = selected_index && entries[selected_index]
  raise ArgumentError, "Unknown fork entryId: #{entry_id}" unless selected

  message = selected["message"]
  raise ArgumentError, "Entry is not forkable: #{entry_id}" unless message.is_a?(Hash) && message_role(message) == "user"

  session, conversation = source.store.create_independent_from_messages(
    entries[0...selected_index].filter_map { |record| record["message"] },
    provider: source.conversation.provider,
    model: source.conversation.model,
    reasoning_effort: source.conversation.reasoning_effort,
    parent_session: source.session
  )

  rpc_session = build_rpc_session(source.store, session, conversation, source.workspace_root)
  remember_session(rpc_session)
  cleanup_other_unused_sessions(rpc_session)
  {
    session: session_payload(rpc_session),
    text: full_message_text(message),
    cancelled: false
  }
end

#in_flight_steer_supported?Boolean

Returns:

  • (Boolean)


517
518
519
# File 'lib/kward/rpc/session_manager.rb', line 517

def in_flight_steer_supported?
  supports_in_flight_steer?
end

#list_sessions(workspace_root: Dir.pwd, limit: nil, current_session_path: nil) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/kward/rpc/session_manager.rb', line 132

def list_sessions(workspace_root: Dir.pwd, limit: nil, current_session_path: nil)
  root = validate_workspace_root(workspace_root)
  store = SessionStore.new(config_dir: @config_dir, cwd: root)
  requested_limit = limit.to_i if limit
  requested_limit = nil unless requested_limit&.positive?
  store.recent(limit: requested_limit, keep_empty_path: current_session_path)
       .map { |info| session_info_payload(info, workspace_root: root) }
end

#memory_add(text:, scope: nil, tags: []) ⇒ Object



421
422
423
# File 'lib/kward/rpc/session_manager.rb', line 421

def memory_add(text:, scope: nil, tags: [])
  { memory: memory_manager.add_soft(text, scope: scope || "global", tags: tags) }
end

#memory_add_core(text:, scope: nil, tags: []) ⇒ Object



425
426
427
# File 'lib/kward/rpc/session_manager.rb', line 425

def memory_add_core(text:, scope: nil, tags: [])
  { memory: memory_manager.add_core(text, scope: scope || "global", tags: tags) }
end

#memory_auto_summary_disableObject



412
413
414
415
# File 'lib/kward/rpc/session_manager.rb', line 412

def memory_auto_summary_disable
  memory_manager.auto_summary_disable
  { autoSummary: false }
end

#memory_auto_summary_enableObject



407
408
409
410
# File 'lib/kward/rpc/session_manager.rb', line 407

def memory_auto_summary_enable
  memory_manager.auto_summary_enable
  { autoSummary: true }
end

#memory_disableObject



402
403
404
405
# File 'lib/kward/rpc/session_manager.rb', line 402

def memory_disable
  memory_manager.disable
  { enabled: false }
end

#memory_enableObject



397
398
399
400
# File 'lib/kward/rpc/session_manager.rb', line 397

def memory_enable
  memory_manager.enable
  { enabled: true }
end

#memory_forget(id:) ⇒ Object



429
430
431
# File 'lib/kward/rpc/session_manager.rb', line 429

def memory_forget(id:)
  { forgotten: memory_manager.forget_memory(id) }
end

#memory_inspectObject



441
442
443
# File 'lib/kward/rpc/session_manager.rb', line 441

def memory_inspect
  memory_manager.inspect_memory
end

#memory_list(include_inactive: false, workspace_root: Dir.pwd) ⇒ Object



417
418
419
# File 'lib/kward/rpc/session_manager.rb', line 417

def memory_list(include_inactive: false, workspace_root: Dir.pwd)
  memory_manager.hierarchy(include_inactive: include_inactive, workspace_root: workspace_root)
end

#memory_managerObject



388
389
390
# File 'lib/kward/rpc/session_manager.rb', line 388

def memory_manager
  Memory::Manager.for_config_dir(@config_dir)
end

#memory_promote(id:) ⇒ Object



433
434
435
# File 'lib/kward/rpc/session_manager.rb', line 433

def memory_promote(id:)
  { memory: memory_manager.promote_memory(id) }
end

#memory_relax(id:, workspace_root: Dir.pwd) ⇒ Object



437
438
439
# File 'lib/kward/rpc/session_manager.rb', line 437

def memory_relax(id:, workspace_root: Dir.pwd)
  { memory: memory_manager.relax_core(id, workspace_root: workspace_root) }
end

#memory_statusObject



392
393
394
395
# File 'lib/kward/rpc/session_manager.rb', line 392

def memory_status
  manager = memory_manager
  { enabled: manager.enabled?, autoSummary: manager.auto_summary_enabled?, paths: manager.paths }
end

#memory_summarize(session_id:) ⇒ Object



453
454
455
456
457
458
# File 'lib/kward/rpc/session_manager.rb', line 453

def memory_summarize(session_id:)
  rpc_session = fetch_session(session_id)
  records = memory_manager.summarize_conversation(rpc_session.conversation, client: @client)
  persist_memory_state(rpc_session)
  { memories: records }
end

#memory_why(session_id: nil) ⇒ Object



445
446
447
448
449
450
451
# File 'lib/kward/rpc/session_manager.rb', line 445

def memory_why(session_id: nil)
  if session_id
    rpc_session = fetch_session(session_id)
    return rpc_session.conversation.last_memory_retrieval || memory_manager.explain_retrieval
  end
  memory_manager.explain_retrieval
end

Moves the active branch to a tree entry, optionally summarizing abandoned history.

Raises:

  • (ArgumentError)


235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/kward/rpc/session_manager.rb', line 235

def navigate_tree(session_id:, entry_id:, summarize: false, custom_instructions: nil)
  rpc_session = fetch_session(session_id)
  tree = session_tree_helper(rpc_session)
  entries = tree.entries
  resolved_entry_id = tree.resolve_entry_id(entry_id, entries: entries)
  entry = rpc_session.store.session_entry(rpc_session.session.path, resolved_entry_id)
  raise ArgumentError, "Unknown tree entryId: #{entry_id}" unless entry

  raise ArgumentError, "Tree entry is not selectable: #{entry_id}" unless tree.selectable_entry?(entry)

  message = entry["message"]
  user_entry = tree.user_entry?(entry)
  target_leaf = user_entry ? entry["parentId"] : entry["id"]
  editor_text = user_entry ? full_message_text(message) : nil
  previous_leaf = rpc_session.session.leaf_id

  if summarize
    summary = summarize_branch(rpc_session, from_id: previous_leaf, to_id: target_leaf, custom_instructions: custom_instructions)
    target_leaf = rpc_session.session.append_branch_summary(target_leaf, from_id: previous_leaf, summary: summary, details: {})
  else
    target_leaf ? rpc_session.session.branch(target_leaf) : rpc_session.session.reset_leaf
  end

  reload_rpc_session(rpc_session)
  {
    session: session_payload(rpc_session),
    editorText: editor_text,
    cancelled: false,
    aborted: false
  }.compact
end

#openrouter_catalogObject



488
489
490
491
# File 'lib/kward/rpc/session_manager.rb', line 488

def openrouter_catalog
  models = @client.respond_to?(:openrouter_catalog) ? Array(@client.openrouter_catalog) : []
  models.map { |model| normalize_model(model) }
end

#plugin_commandsObject



476
477
478
# File 'lib/kward/rpc/session_manager.rb', line 476

def plugin_commands
  plugin_registry.commands
end

#refresh_client_configObject



564
565
566
567
568
# File 'lib/kward/rpc/session_manager.rb', line 564

def refresh_client_config
  @client.reload_config if @client.respond_to?(:reload_config)
  refresh_session_runtime_contexts
  refresh_session_tool_registries
end

#reload_pluginsObject



570
571
572
573
574
575
576
577
578
579
580
581
# File 'lib/kward/rpc/session_manager.rb', line 570

def reload_plugins
  registry = PluginRegistry.load(reserved_commands: reserved_plugin_command_names)
  sessions = @mutex.synchronize do
    @plugin_registry = registry
    @sessions.values
  end
  sessions.each do |rpc_session|
    rpc_session.conversation.plugin_registry = registry if rpc_session.conversation.respond_to?(:plugin_registry=)
    rpc_session.conversation.refresh_system_message! if rpc_session.conversation.respond_to?(:refresh_system_message!)
    emit_footer_update(rpc_session)
  end
end

#rename_session(session_id:, name:) ⇒ Object

Renames the persisted session attached to an RPC session id.



142
143
144
145
146
# File 'lib/kward/rpc/session_manager.rb', line 142

def rename_session(session_id:, name:)
  rpc_session = fetch_session(session_id)
  rpc_session.session.rename(name)
  session_payload(rpc_session)
end

#resume_session(path:, workspace_root: nil, include_transcript: false) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/kward/rpc/session_manager.rb', line 109

def resume_session(path:, workspace_root: nil, include_transcript: false)
  root = validate_workspace_root(workspace_root || Dir.pwd)
  store = SessionStore.new(config_dir: @config_dir, cwd: root)
  location = store.session_location(path)
  root = validate_workspace_root(location[:cwd])
  store = SessionStore.new(config_dir: @config_dir, cwd: root)
  session, conversation = store.load(
    location[:path],
    workspace: configured_workspace(root),
    provider: current_model[:provider],
    model: current_model_id,
    reasoning_effort: current_reasoning_effort
  )
  rpc_session = build_rpc_session(store, session, conversation, root)
  remember_session(rpc_session)
  cleanup_other_unused_sessions(rpc_session)
  emit_footer_update(rpc_session)
  payload = session_payload(rpc_session)
  payload[:messages] = TranscriptNormalizer.new(rpc_session.conversation.messages).normalize if include_transcript
  payload[:resumed] = true
  payload
end

#run_command(session_id:, command:, arguments: "") ⇒ Object



381
382
383
384
385
386
# File 'lib/kward/rpc/session_manager.rb', line 381

def run_command(session_id:, command:, arguments: "")
  name = command.to_s.delete_prefix("/")
  return { ok: false, error: "unsupported", reason: "clientClipboardOwnedByUi" } if name == "copy"

  run_plugin_command(session_id: session_id, command: name, arguments: arguments)
end

#run_plugin_command(session_id:, command:, arguments: "") ⇒ Object



460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/kward/rpc/session_manager.rb', line 460

def run_plugin_command(session_id:, command:, arguments: "")
  rpc_session = fetch_session(session_id)
  command = plugin_registry.command_for(command.to_s.delete_prefix("/")) || raise(ArgumentError, "Unknown plugin command: #{command}")
  output = []
  context = PluginRegistry::Context.new(
    conversation: rpc_session.conversation,
    args: arguments.to_s,
    session: rpc_session.session,
    workspace_root: rpc_session.workspace_root,
    say_callback: lambda { |message| output << message.to_s }
  )
  result = command.handler.call(arguments.to_s, context)
  output = rpc_session.plugin_output.shift(rpc_session.plugin_output.length) + output
  { command: command.name, output: output, result: result.nil? ? nil : result.to_s }
end

#runtime_state(session_id:) ⇒ Object



521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# File 'lib/kward/rpc/session_manager.rb', line 521

def runtime_state(session_id:)
  rpc_session = fetch_session(session_id)
  model = session_model(rpc_session)
  compaction_settings = self.compaction_settings
  auto_compaction_reserve_tokens = compaction_reserve_tokens(
    context_window: model[:contextWindow],
    compaction_settings: compaction_settings
  )
  session = session_payload(rpc_session)
  RuntimePayloads.state(
    session: session,
    model: model,
    streaming: streaming?(rpc_session),
    steering_supported: supports_in_flight_steer?,
    auto_compaction_reserve_tokens: auto_compaction_reserve_tokens,
    active_persona_label: active_persona_label(rpc_session),
    message_count: @session_metrics.message_count(rpc_session.conversation),
    pending_count: pending_turn_count(rpc_session.id),
    compaction_enabled: compaction_settings.enabled,
    workspace_guardrails_enabled: workspace_guardrails_enabled?
  )
end

#runtime_stats(session_id:) ⇒ Object



544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
# File 'lib/kward/rpc/session_manager.rb', line 544

def runtime_stats(session_id:)
  rpc_session = fetch_session(session_id)
  session = session_payload(rpc_session)
  counts = @session_metrics.message_stats(rpc_session.conversation)
  model = session_model(rpc_session)
  compaction_settings = self.compaction_settings
  auto_compaction_reserve_tokens = compaction_reserve_tokens(
    context_window: model[:contextWindow],
    compaction_settings: compaction_settings
  )
  RuntimePayloads.stats(
    session: session,
    counts: counts,
    model: model,
    auto_compaction_reserve_tokens: auto_compaction_reserve_tokens,
    context_usage: @session_metrics.context_usage(rpc_session, model, client: @client),
    compaction_enabled: compaction_settings.enabled
  )
end

#session_model(rpc_session) ⇒ Object



500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/kward/rpc/session_manager.rb', line 500

def session_model(rpc_session)
  current = current_model
  provider = rpc_session.conversation.provider || current[:provider]
  model = rpc_session.conversation.model || current[:id]
  reasoning_effort = rpc_session.conversation.reasoning_effort || current_reasoning_effort
  reasoning_effort = nil unless ModelInfo.reasoning_supported?(provider, model)
  context_window = current[:contextWindow] if provider == current[:provider] && model == current[:id]
  normalize_model(
    provider: provider,
    id: model,
    model: model,
    reasoningEffort: reasoning_effort,
    contextWindow: context_window,
    current: true
  )
end

#session_modified_at(session) ⇒ Object



591
592
593
# File 'lib/kward/rpc/session_manager.rb', line 591

def session_modified_at(session)
  File.exist?(session.path) ? File.mtime(session.path) : nil
end

#session_payload(rpc_session) ⇒ Object



583
584
585
586
587
588
589
# File 'lib/kward/rpc/session_manager.rb', line 583

def session_payload(rpc_session)
  RuntimePayloads.session(
    rpc_session,
    modified_at: session_modified_at(rpc_session.session),
    active_persona_label: active_persona_label(rpc_session)
  )
end

#session_tree(session_id:) ⇒ Object

Returns the flattened session tree rows consumed by RPC clients.



222
223
224
225
# File 'lib/kward/rpc/session_manager.rb', line 222

def session_tree(session_id:)
  rpc_session = fetch_session(session_id)
  { items: flatten_session_tree(rpc_session) }
end

#set_tree_label(session_id:, entry_id:, label: nil) ⇒ Object

Persists a label override for one tree entry.



228
229
230
231
232
# File 'lib/kward/rpc/session_manager.rb', line 228

def set_tree_label(session_id:, entry_id:, label: nil)
  rpc_session = fetch_session(session_id)
  rpc_session.session.append_label_change(entry_id, label)
  { ok: true }
end

#start_turn(session_id:, input:, streaming_behavior: nil, attachments: []) ⇒ Object

Queues or starts an async model turn for an RPC session.

streaming_behavior controls busy-session behavior: create a new turn, queue a follow-up, or steer the running turn when the active provider supports native steering. The returned turn id is used for status, cancellation, and event replay.



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/kward/rpc/session_manager.rb', line 316

def start_turn(session_id:, input:, streaming_behavior: nil, attachments: [])
  rpc_session = fetch_session(session_id)
  normalized_attachments = normalize_attachments(attachments)
  plugin_command, plugin_arguments = plugin_command_turn(input, normalized_attachments)
  display_input = input.to_s if input.is_a?(String)
  content = plugin_command ? input.to_s : user_turn_content(expand_prompt_input(input), normalized_attachments)
  streaming_behavior = validate_streaming_behavior(default_streaming_behavior(rpc_session, streaming_behavior), rpc_session: rpc_session)
  if streaming_behavior == "steer"
    steered_turn = steer_running_turn(rpc_session, content)
    return steered_turn if steered_turn

    streaming_behavior = "followUp"
  end
  turn = Turn.new(
    id: SecureRandom.uuid,
    session_id: rpc_session.id,
    input: content,
    display_input: display_input,
    status: "queued",
    cancel_requested: false,
    cancellation: Cancellation.new,
    created_at: now,
    events: [],
    next_sequence: 1,
    streaming_behavior: streaming_behavior,
    plugin_command_name: plugin_command&.name,
    plugin_arguments: plugin_arguments
  )
  @mutex.synchronize { @turns[turn.id] = turn }
  rpc_session.queue << turn.id
  ensure_worker(rpc_session)
  emit_turn_event(turn, "turnQueued", { status: "queued" })
  turn_payload(turn)
end

#transcript(session_id:) ⇒ Object

Returns the normalized transcript for the active RPC session.



305
306
307
308
# File 'lib/kward/rpc/session_manager.rb', line 305

def transcript(session_id:)
  rpc_session = fetch_session(session_id)
  { session: session_payload(rpc_session), messages: TranscriptNormalizer.new(rpc_session.conversation.messages).normalize }
end

#turn_events(turn_id:, after_sequence: 0) ⇒ Object



366
367
368
369
370
371
372
373
# File 'lib/kward/rpc/session_manager.rb', line 366

def turn_events(turn_id:, after_sequence: 0)
  turn = fetch_turn(turn_id)
  after_sequence = after_sequence.to_i
  {
    turn: turn_payload(turn),
    events: turn.events.select { |event| event[:sequence].to_i > after_sequence }
  }
end

#turn_status(turn_id:) ⇒ Object



362
363
364
# File 'lib/kward/rpc/session_manager.rb', line 362

def turn_status(turn_id:)
  turn_payload(fetch_turn(turn_id))
end

#validate_workspace_root(root) ⇒ Object



595
596
597
598
599
600
# File 'lib/kward/rpc/session_manager.rb', line 595

def validate_workspace_root(root)
  expanded = File.expand_path(root.to_s.empty? ? Dir.pwd : root.to_s)
  raise "Workspace root is not an existing directory: #{expanded}" unless File.directory?(expanded)

  File.realpath(expanded)
end