Class: SessionChannel

Inherits:
ApplicationCable::Channel show all
Defined in:
app/channels/session_channel.rb

Overview

Streams messages for a specific session to connected clients. Part of the Brain/TUI separation: the Brain broadcasts messages through this channel, and any number of clients (TUI, web, API) can subscribe.

On subscription, sends the session’s chat history so the client can render previous messages without a separate API call.

Examples:

Client subscribes to a session

App.cable.subscriptions.create({ channel: "SessionChannel", session_id: 42 })

Constant Summary collapse

DEFAULT_LIST_LIMIT =
10
MAX_LIST_LIMIT =
50

Instance Method Summary collapse

Instance Method Details

#change_view_mode(data) ⇒ Object

Changes the session’s view mode and re-broadcasts the viewport. All clients on the session receive the mode change and fresh history.

Parameters:

  • data (Hash)

    must include “view_mode” (one of Session::VIEW_MODES)



159
160
161
162
163
164
165
166
167
168
169
170
# File 'app/channels/session_channel.rb', line 159

def change_view_mode(data)
  mode = data["view_mode"].to_s
  return transmit_error("Invalid view mode") unless Session::VIEW_MODES.include?(mode)

  session = Session.find(@current_session_id)
  session.update!(view_mode: mode)

  ActionCable.server.broadcast(stream_name, {"action" => "view_mode_changed", "view_mode" => mode})
  broadcast_viewport(session)
rescue ActiveRecord::RecordNotFound
  transmit_error("Session not found")
end

#create_session(_data) ⇒ Object

Creates a new session and switches the channel stream to it. The client receives a session_changed signal followed by (empty) history.



112
113
114
115
# File 'app/channels/session_channel.rb', line 112

def create_session(_data)
  session = Session.create!
  switch_to_session(session.id)
end

#interrupt_execution(_data) ⇒ Object

Requests interruption of the current tool execution. Sets the interrupt_requested flag on the session — long-running tools (Tools::Bash) poll it and abort early with a synthetic “Your human wants your attention” result that satisfies the Anthropic tool_use/tool_result pairing requirement.

Cascades to running sub-agent sessions to avoid burning tokens in child jobs that the parent will discard anyway.

No-op on idle sessions — nothing to interrupt, and the flag would leak into the next round without an AASM transition to clear it.

Parameters:

  • _data (Hash)

    unused



86
87
88
89
90
91
92
93
# File 'app/channels/session_channel.rb', line 86

def interrupt_execution(_data)
  session = Session.find(@current_session_id)
  return if session.idle?

  session.update!(interrupt_requested: true)
  session.child_sessions.processing.update_all(interrupt_requested: true)
  ActionCable.server.broadcast(stream_name, {"action" => "interrupt_acknowledged"})
end

#list_sessions(data) ⇒ Object

Returns recent root sessions with nested child metadata for session picker UI. Filters to root sessions only (no parent_session_id). Child sessions are nested under their parent with name and status information.

Parameters:

  • data (Hash)

    optional “limit” (default 10, max 50)



100
101
102
103
104
105
106
107
108
# File 'app/channels/session_channel.rb', line 100

def list_sessions(data)
  limit = (data["limit"] || DEFAULT_LIST_LIMIT).to_i.clamp(1, MAX_LIST_LIMIT)
  sessions = Session.root_sessions.recent(limit).includes(:child_sessions)
  all_ids = sessions.flat_map { |session| [session.id] + session.child_sessions.map(&:id) }
  counts = Message.where(session_id: all_ids).llm_messages.group(:session_id).count

  result = sessions.map { |session| serialize_session_with_children(session, counts) }
  transmit({"action" => "sessions_list", "sessions" => result})
end

#recall_pending(data) ⇒ Object

Recalls the most recent pending message for editing. Deletes the PendingMessage — its after_destroy_commit broadcasts removal so all clients remove the pending indicator.

Parameters:

  • data (Hash)

    must include “pending_message_id” (positive integer)



65
66
67
68
69
70
71
# File 'app/channels/session_channel.rb', line 65

def recall_pending(data)
  pm_id = data["pending_message_id"].to_i
  return if pm_id <= 0

  pm = PendingMessage.find_by(id: pm_id, session_id: @current_session_id)
  pm&.destroy!
end

#receive(data) ⇒ Object

Receives messages from clients and broadcasts them to all session subscribers.

Parameters:

  • data (Hash)

    arbitrary message payload



39
40
41
# File 'app/channels/session_channel.rb', line 39

def receive(data)
  ActionCable.server.broadcast(stream_name, data)
end

#save_token(data) ⇒ Object

Validates and saves an Anthropic subscription token to encrypted storage. Format-validated and API-validated before storage. The token never enters the LLM context window — it flows directly from WebSocket to the secrets table.

Parameters:

  • data (Hash)

    must include “token” (Anthropic subscription token string)



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'app/channels/session_channel.rb', line 135

def save_token(data)
  token = data["token"].to_s.strip

  Providers::Anthropic.validate_token_format!(token)

  warning = begin
    Providers::Anthropic.validate_token_api!(token)
    nil
  rescue Providers::Anthropic::TransientError => transient
    # Token format is valid but API is temporarily unavailable (500, timeout, etc.).
    # Save the token to break the prompt loop — it will work once the API recovers.
    "Token saved but could not be verified — #{transient.message}"
  end

  write_anthropic_token(token)
  transmit({"action" => "token_saved", "warning" => warning}.compact)
rescue Providers::Anthropic::TokenFormatError, Providers::Anthropic::AuthenticationError => error
  transmit({"action" => "token_error", "message" => error.message})
end

#speak(data) ⇒ Object

Processes user input by enqueuing a bounce-back-flagged user_message PendingMessage on the session. The PM’s after_create_commit kicks off the drain pipeline — Melete → (Mneme) → DrainJob — when the session is idle; otherwise the PM queues silently and the idle-wake rule on Session picks it up on the next transition to :idle. If the first LLM call after promotion fails, DrainJob emits a Events::BounceBack so the TUI can restore the text to the input.

Parameters:

  • data (Hash)

    must include “content” with the user’s message text

See Also:



53
54
55
56
57
58
# File 'app/channels/session_channel.rb', line 53

def speak(data)
  content = data["content"].to_s.strip
  return if content.empty?

  Session.find(@current_session_id).enqueue_user_message(content, bounce_back: true)
end

#subscribedObject

Subscribes the client to the session-specific stream. When a valid session_id is provided, subscribes to that session. When omitted or zero, resolves to the most recent session (creating one if none exist) — this is the CQRS-compliant path where the server owns session resolution instead of a REST endpoint.

Always transmits a session_changed signal so the client learns the authoritative session ID, followed by view_mode and history.

Parameters:

  • params (Hash)

    optional :session_id (positive integer)



26
27
28
29
30
31
32
33
34
# File 'app/channels/session_channel.rb', line 26

def subscribed
  @current_session_id = resolve_session_id
  stream_from stream_name

  session = Session.find(@current_session_id)
  transmit_session_changed(session)
  transmit_view_mode(session)
  transmit_history(session)
end

#switch_session(data) ⇒ Object

Switches the channel stream to an existing session. The client receives a session_changed signal followed by chat history.

Parameters:

  • data (Hash)

    must include “session_id” (positive integer)



121
122
123
124
125
126
127
128
# File 'app/channels/session_channel.rb', line 121

def switch_session(data)
  target_id = data["session_id"].to_i
  return transmit_error("Session not found") unless target_id > 0

  switch_to_session(target_id)
rescue ActiveRecord::RecordNotFound
  transmit_error("Session not found")
end