Class: Muxr::ControlServer
- Inherits:
-
Object
- Object
- Muxr::ControlServer
- Defined in:
- lib/muxr/control_server.rb
Overview
ControlServer is the second listener on the muxr server: it accepts multiple concurrent JSON-RPC clients over a Unix socket at ~/.muxr/sockets/<name>.ctrl.sock and lets them inspect or drive the session. The primary client of this socket is the MCP bridge (bin/muxr-mcp) that exposes the methods as MCP tools for Claude Code, but anything that can read/write NDJSON can drive a session — ‘nc` is enough for poking around by hand.
Wire format: one JSON object per line. Requests carry an ‘id`; responses echo it back. Server-pushed events (used by `pane.subscribe`) have no `id` and instead set a `method` of `“event.<topic>”`.
--> {"id":1,"method":"panes.list"}
<-- {"id":1,"result":{"panes":[...]}}
<-- {"method":"event.pane.output","params":{"pane":"a3f9b2","data":"..."}}
The TTY socket (.sock) and the control socket (.ctrl.sock) are deliberately separate: the TTY socket is single-client (only one human attaches at a time), the control socket is multi-client, and a connected control client never counts as “attached” — Renderer is unaffected.
Constant Summary collapse
- PARSE_ERROR =
JSON-RPC error code conventions.
-32700
- INVALID_REQUEST =
-32600
- METHOD_NOT_FOUND =
-32601
- INVALID_PARAMS =
-32602
- INTERNAL_ERROR =
-32603
- READ_CHUNK =
64 * 1024
Instance Attribute Summary collapse
-
#app ⇒ Object
readonly
Returns the value of attribute app.
-
#socket_path ⇒ Object
readonly
Returns the value of attribute socket_path.
Instance Method Summary collapse
-
#accept_client ⇒ Object
—– internals —–.
- #add_subscription(client_io, pane_id) ⇒ Object
- #complete_run(run, timed_out:, now:) ⇒ Object
- #consume_client(io) ⇒ Object
- #drain_client(io) ⇒ Object
- #drop_client(io) ⇒ Object
- #emit_event(io, method, params) ⇒ Object
- #handle_read(io) ⇒ Object
- #handle_write(io) ⇒ Object
-
#initialize(app, socket_path) ⇒ ControlServer
constructor
A new instance of ControlServer.
- #monotonic_now ⇒ Object
-
#on_pane_output(pane_id, _data) ⇒ Object
Called whenever a pane’s PTY emits output.
- #owns?(io) ⇒ Boolean
- #pane_by_id(id) ⇒ Object
- #process_message(io, line) ⇒ Object
-
#read_ios ⇒ Object
IO arrays the Application splices into its IO.select read/write sets.
-
#register_pending_run(client_io:, request_id:, pane_id:, idle_seconds:, timeout_seconds:) ⇒ Object
Called from the Dispatcher when a pane.run request arrives.
- #remove_subscription(client_io, pane_id) ⇒ Object
- #respond_error(io, id:, code:, message:, data: nil) ⇒ Object
- #respond_result(io, id:, result:) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
-
#tick ⇒ Object
Called once per IO.select tick.
- #write_ios ⇒ Object
- #write_json(io, hash) ⇒ Object
Constructor Details
#initialize(app, socket_path) ⇒ ControlServer
Returns a new instance of ControlServer.
37 38 39 40 41 42 43 44 45 |
# File 'lib/muxr/control_server.rb', line 37 def initialize(app, socket_path) @app = app @socket_path = socket_path @server = nil @clients = {} # io => { read_buffer:, write_buffer: } @subscriptions = {} # io => Set[pane_id] (populated in step 3) @pending_runs = [] # in-flight pane.run waiters (populated in step 3) @dispatcher = Dispatcher.new(app, self) end |
Instance Attribute Details
#app ⇒ Object (readonly)
Returns the value of attribute app.
47 48 49 |
# File 'lib/muxr/control_server.rb', line 47 def app @app end |
#socket_path ⇒ Object (readonly)
Returns the value of attribute socket_path.
47 48 49 |
# File 'lib/muxr/control_server.rb', line 47 def socket_path @socket_path end |
Instance Method Details
#accept_client ⇒ Object
—– internals —–
173 174 175 176 177 178 |
# File 'lib/muxr/control_server.rb', line 173 def accept_client sock = @server.accept @clients[sock] = { read_buffer: +"", write_buffer: +"".b } rescue StandardError # Accept may transiently fail on a closed peer; ignore. end |
#add_subscription(client_io, pane_id) ⇒ Object
158 159 160 161 |
# File 'lib/muxr/control_server.rb', line 158 def add_subscription(client_io, pane_id) @subscriptions[client_io] ||= Set.new @subscriptions[client_io].add(pane_id.to_s) end |
#complete_run(run, timed_out:, now:) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/muxr/control_server.rb', line 241 def complete_run(run, timed_out:, now:) pane = pane_by_id(run[:pane_id]) unless pane respond_error(run[:client_io], id: run[:request_id], code: INVALID_PARAMS, message: "pane #{run[:pane_id]} no longer exists") return end term = pane.terminal respond_result(run[:client_io], id: run[:request_id], result: { "pane" => pane.id.to_s, "timed_out" => timed_out, "had_output" => run[:had_output], "elapsed_ms" => ((now - run[:started_at]) * 1000).round, "text" => term.dump_text, "cursor" => { "row" => term.cursor_row, "col" => term.cursor_col } }) end |
#consume_client(io) ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/muxr/control_server.rb', line 180 def consume_client(io) state = @clients[io] return unless state begin chunk = io.read_nonblock(READ_CHUNK) rescue IO::WaitReadable return rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, IOError drop_client(io) return end state[:read_buffer] << chunk loop do nl = state[:read_buffer].index("\n") break unless nl line = state[:read_buffer].slice!(0..nl) line.chomp! next if line.empty? (io, line) end end |
#drain_client(io) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/muxr/control_server.rb', line 202 def drain_client(io) state = @clients[io] return unless state buf = state[:write_buffer] return if buf.empty? loop do n = io.write_nonblock(buf) if n >= buf.bytesize buf.clear break else # write_nonblock returns the count actually written; slice the rest. state[:write_buffer] = buf.byteslice(n..-1) || +"".b buf = state[:write_buffer] end end rescue IO::WaitWritable # Kernel send buffer full; remainder stays queued. rescue Errno::EPIPE, Errno::ECONNRESET, IOError drop_client(io) end |
#drop_client(io) ⇒ Object
224 225 226 227 228 229 230 231 |
# File 'lib/muxr/control_server.rb', line 224 def drop_client(io) @clients.delete(io) @subscriptions.delete(io) # Any pane.run waiters owned by this client are silently abandoned — # there's nobody to respond to. @pending_runs.reject! { |r| r[:client_io] == io } unless @pending_runs.empty? io.close rescue nil end |
#emit_event(io, method, params) ⇒ Object
298 299 300 |
# File 'lib/muxr/control_server.rb', line 298 def emit_event(io, method, params) write_json(io, { "method" => method, "params" => params }) end |
#handle_read(io) ⇒ Object
85 86 87 88 89 90 91 |
# File 'lib/muxr/control_server.rb', line 85 def handle_read(io) if io == @server accept_client else consume_client(io) end end |
#handle_write(io) ⇒ Object
93 94 95 |
# File 'lib/muxr/control_server.rb', line 93 def handle_write(io) drain_client(io) end |
#monotonic_now ⇒ Object
233 234 235 |
# File 'lib/muxr/control_server.rb', line 233 def monotonic_now Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#on_pane_output(pane_id, _data) ⇒ Object
Called whenever a pane’s PTY emits output. Drives pane.run idle detection and pane.subscribe streams.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/muxr/control_server.rb', line 101 def on_pane_output(pane_id, _data) pid = pane_id.to_s now = monotonic_now unless @pending_runs.empty? @pending_runs.each do |run| next unless run[:pane_id] == pid run[:last_output_at] = now run[:had_output] = true end end unless @subscriptions.empty? pane = pane_by_id(pid) return unless pane text = pane.terminal.dump_text cursor = { "row" => pane.terminal.cursor_row, "col" => pane.terminal.cursor_col } @subscriptions.each do |io, ids| next unless ids.include?(pid) emit_event(io, "event.pane.output", { "pane" => pid, "text" => text, "cursor" => cursor }) end end end |
#owns?(io) ⇒ Boolean
81 82 83 |
# File 'lib/muxr/control_server.rb', line 81 def owns?(io) io == @server || @clients.key?(io) end |
#pane_by_id(id) ⇒ Object
237 238 239 |
# File 'lib/muxr/control_server.rb', line 237 def pane_by_id(id) @app.session.window.panes.find { |p| p.id.to_s == id.to_s } end |
#process_message(io, line) ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/muxr/control_server.rb', line 260 def (io, line) msg = JSON.parse(line) rescue JSON::ParserError => e respond_error(io, id: nil, code: PARSE_ERROR, message: "Parse error: #{e.}") return else id = msg["id"] method = msg["method"] params = msg["params"] || {} unless method.is_a?(String) respond_error(io, id: id, code: INVALID_REQUEST, message: "missing method") return end begin # The dispatcher may either return a Hash (synchronous result) or the # symbol :deferred (it will later push a response when ready — used # by pane.run / pane.subscribe in step 3). result = @dispatcher.call(method: method, params: params, client_io: io, request_id: id) respond_result(io, id: id, result: result) if id && result != :deferred rescue Dispatcher::Error => e respond_error(io, id: id, code: e.code, message: e.) rescue StandardError => e respond_error(io, id: id, code: INTERNAL_ERROR, message: "#{e.class}: #{e.}") end end |
#read_ios ⇒ Object
IO arrays the Application splices into its IO.select read/write sets.
68 69 70 71 72 73 |
# File 'lib/muxr/control_server.rb', line 68 def read_ios return [] unless @server ios = [@server] ios.concat(@clients.keys) ios end |
#register_pending_run(client_io:, request_id:, pane_id:, idle_seconds:, timeout_seconds:) ⇒ Object
Called from the Dispatcher when a pane.run request arrives. Registers a waiter that #tick later resolves.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/muxr/control_server.rb', line 143 def register_pending_run(client_io:, request_id:, pane_id:, idle_seconds:, timeout_seconds:) now = monotonic_now @pending_runs << { client_io: client_io, request_id: request_id, pane_id: pane_id.to_s, idle_seconds: idle_seconds, timeout_seconds: timeout_seconds, last_output_at: now, had_output: false, deadline_at: now + timeout_seconds, started_at: now } end |
#remove_subscription(client_io, pane_id) ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/muxr/control_server.rb', line 163 def remove_subscription(client_io, pane_id) set = @subscriptions[client_io] return false unless set removed = set.delete?(pane_id.to_s) @subscriptions.delete(client_io) if set.empty? !removed.nil? end |
#respond_error(io, id:, code:, message:, data: nil) ⇒ Object
292 293 294 295 296 |
# File 'lib/muxr/control_server.rb', line 292 def respond_error(io, id:, code:, message:, data: nil) err = { "code" => code, "message" => } err["data"] = data unless data.nil? write_json(io, { "id" => id, "error" => err }) end |
#respond_result(io, id:, result:) ⇒ Object
288 289 290 |
# File 'lib/muxr/control_server.rb', line 288 def respond_result(io, id:, result:) write_json(io, { "id" => id, "result" => result }) end |
#start ⇒ Object
49 50 51 52 53 |
# File 'lib/muxr/control_server.rb', line 49 def start File.unlink(@socket_path) if File.exist?(@socket_path) @server = UNIXServer.new(@socket_path) File.chmod(0o600, @socket_path) rescue nil end |
#stop ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/muxr/control_server.rb', line 55 def stop @clients.each_key { |c| c.close rescue nil } @clients.clear @subscriptions.clear @pending_runs.clear if @server @server.close rescue nil @server = nil end File.unlink(@socket_path) if File.exist?(@socket_path) end |
#tick ⇒ Object
Called once per IO.select tick. Resolves any pane.run waiters whose idle window has elapsed or whose timeout has fired.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/muxr/control_server.rb', line 125 def tick return if @pending_runs.empty? now = monotonic_now completed = [] @pending_runs.each do |run| if now >= run[:deadline_at] complete_run(run, timed_out: true, now: now) completed << run elsif run[:had_output] && (now - run[:last_output_at]) >= run[:idle_seconds] complete_run(run, timed_out: false, now: now) completed << run end end @pending_runs -= completed unless completed.empty? end |
#write_ios ⇒ Object
75 76 77 78 79 |
# File 'lib/muxr/control_server.rb', line 75 def write_ios @clients.each_with_object([]) do |(io, state), acc| acc << io unless state[:write_buffer].empty? end end |
#write_json(io, hash) ⇒ Object
302 303 304 305 306 307 |
# File 'lib/muxr/control_server.rb', line 302 def write_json(io, hash) return unless @clients.key?(io) line = JSON.generate(hash) + "\n" @clients[io][:write_buffer] << line.b drain_client(io) end |