Class: SessionChannel
- Inherits:
-
ApplicationCable::Channel
- Object
- ActionCable::Channel::Base
- ApplicationCable::Channel
- SessionChannel
- Defined in:
- app/channels/session_channel.rb
Overview
Streams messages for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts messages through this channel, and any number of clients (TUI, web, API) can subscribe.
On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.
Constant Summary collapse
- DEFAULT_LIST_LIMIT =
10- MAX_LIST_LIMIT =
50
Instance Method Summary collapse
-
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport.
-
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it.
-
#interrupt_execution(_data) ⇒ Object
Requests interruption of the current tool execution.
-
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI.
-
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing.
-
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
-
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted storage.
-
#speak(data) ⇒ Object
Processes user input by enqueuing a bounce-back-flagged user_message PendingMessage on the session.
-
#subscribed ⇒ Object
Subscribes the client to the session-specific stream.
-
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session.
Instance Method Details
#change_view_mode(data) ⇒ Object
Changes the session’s view mode and re-broadcasts the viewport. All clients on the session receive the mode change and fresh history.
159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'app/channels/session_channel.rb', line 159 def change_view_mode(data) mode = data["view_mode"].to_s return transmit_error("Invalid view mode") unless Session::VIEW_MODES.include?(mode) session = Session.find(@current_session_id) session.update!(view_mode: mode) ActionCable.server.broadcast(stream_name, {"action" => "view_mode_changed", "view_mode" => mode}) (session) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |
#create_session(_data) ⇒ Object
Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.
112 113 114 115 |
# File 'app/channels/session_channel.rb', line 112 def create_session(_data) session = Session.create! switch_to_session(session.id) end |
#interrupt_execution(_data) ⇒ Object
Requests interruption of the current tool execution. Sets the interrupt_requested flag on the session — long-running tools (Tools::Bash) poll it and abort early with a synthetic “Your human wants your attention” result that satisfies the Anthropic tool_use/tool_result pairing requirement.
Cascades to running sub-agent sessions to avoid burning tokens in child jobs that the parent will discard anyway.
No-op on idle sessions — nothing to interrupt, and the flag would leak into the next round without an AASM transition to clear it.
86 87 88 89 90 91 92 93 |
# File 'app/channels/session_channel.rb', line 86 def interrupt_execution(_data) session = Session.find(@current_session_id) return if session.idle? session.update!(interrupt_requested: true) session.child_sessions.processing.update_all(interrupt_requested: true) ActionCable.server.broadcast(stream_name, {"action" => "interrupt_acknowledged"}) end |
#list_sessions(data) ⇒ Object
Returns recent root sessions with nested child metadata for session picker UI. Filters to root sessions only (no parent_session_id). Child sessions are nested under their parent with name and status information.
100 101 102 103 104 105 106 107 108 |
# File 'app/channels/session_channel.rb', line 100 def list_sessions(data) limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT) sessions = Session.root_sessions.recent(limit).includes(:child_sessions) all_ids = sessions.flat_map { |session| [session.id] + session.child_sessions.map(&:id) } counts = Message.where(session_id: all_ids)..group(:session_id).count result = sessions.map { |session| serialize_session_with_children(session, counts) } transmit({"action" => "sessions_list", "sessions" => result}) end |
#recall_pending(data) ⇒ Object
Recalls the most recent pending message for editing. Deletes the PendingMessage — its after_destroy_commit broadcasts removal so all clients remove the pending indicator.
65 66 67 68 69 70 71 |
# File 'app/channels/session_channel.rb', line 65 def recall_pending(data) pm_id = data["pending_message_id"].to_i return if pm_id <= 0 pm = PendingMessage.find_by(id: pm_id, session_id: @current_session_id) pm&.destroy! end |
#receive(data) ⇒ Object
Receives messages from clients and broadcasts them to all session subscribers.
39 40 41 |
# File 'app/channels/session_channel.rb', line 39 def receive(data) ActionCable.server.broadcast(stream_name, data) end |
#save_token(data) ⇒ Object
Validates and saves an Anthropic subscription token to encrypted storage. Format-validated and API-validated before storage. The token never enters the LLM context window — it flows directly from WebSocket to the secrets table.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'app/channels/session_channel.rb', line 135 def save_token(data) token = data["token"].to_s.strip Providers::Anthropic.validate_token_format!(token) warning = begin Providers::Anthropic.validate_token_api!(token) nil rescue Providers::Anthropic::TransientError => transient # Token format is valid but API is temporarily unavailable (500, timeout, etc.). # Save the token to break the prompt loop — it will work once the API recovers. "Token saved but could not be verified — #{transient.}" end write_anthropic_token(token) transmit({"action" => "token_saved", "warning" => warning}.compact) rescue Providers::Anthropic::TokenFormatError, Providers::Anthropic::AuthenticationError => error transmit({"action" => "token_error", "message" => error.}) end |
#speak(data) ⇒ Object
Processes user input by enqueuing a bounce-back-flagged user_message PendingMessage on the session. The PM’s after_create_commit kicks off the drain pipeline — Melete → (Mneme) → DrainJob — when the session is idle; otherwise the PM queues silently and the idle-wake rule on Session picks it up on the next transition to :idle. If the first LLM call after promotion fails, DrainJob emits a Events::BounceBack so the TUI can restore the text to the input.
53 54 55 56 57 58 |
# File 'app/channels/session_channel.rb', line 53 def speak(data) content = data["content"].to_s.strip return if content.empty? Session.find(@current_session_id).(content, bounce_back: true) end |
#subscribed ⇒ Object
Subscribes the client to the session-specific stream. When a valid session_id is provided, subscribes to that session. When omitted or zero, resolves to the most recent session (creating one if none exist) — this is the CQRS-compliant path where the server owns session resolution instead of a REST endpoint.
Always transmits a session_changed signal so the client learns the authoritative session ID, followed by view_mode and history.
26 27 28 29 30 31 32 33 34 |
# File 'app/channels/session_channel.rb', line 26 def subscribed @current_session_id = resolve_session_id stream_from stream_name session = Session.find(@current_session_id) transmit_session_changed(session) transmit_view_mode(session) transmit_history(session) end |
#switch_session(data) ⇒ Object
Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.
121 122 123 124 125 126 127 128 |
# File 'app/channels/session_channel.rb', line 121 def switch_session(data) target_id = data["session_id"].to_i return transmit_error("Session not found") unless target_id > 0 switch_to_session(target_id) rescue ActiveRecord::RecordNotFound transmit_error("Session not found") end |