Class: Muxr::ControlServer

Inherits:
Object
  • Object
show all
Defined in:
lib/muxr/control_server.rb

Overview

ControlServer is the second listener on the muxr server: it accepts multiple concurrent JSON-RPC clients over a Unix socket at ~/.muxr/sockets/<name>.ctrl.sock and lets them inspect or drive the session. The primary client of this socket is the MCP bridge (bin/muxr-mcp) that exposes the methods as MCP tools for Claude Code, but anything that can read/write NDJSON can drive a session — ‘nc` is enough for poking around by hand.

Wire format: one JSON object per line. Requests carry an ‘id`; responses echo it back. Server-pushed events (used by `pane.subscribe`) have no `id` and instead set a `method` of `“event.<topic>”`.

--> {"id":1,"method":"panes.list"}
<-- {"id":1,"result":{"panes":[...]}}
<-- {"method":"event.pane.output","params":{"pane":"a3f9b2","data":"..."}}

The TTY socket (.sock) and the control socket (.ctrl.sock) are deliberately separate: the TTY socket is single-client (only one human attaches at a time), the control socket is multi-client, and a connected control client never counts as “attached” — Renderer is unaffected.

Constant Summary collapse

PARSE_ERROR =

JSON-RPC error code conventions.

-32700
INVALID_REQUEST =
-32600
METHOD_NOT_FOUND =
-32601
INVALID_PARAMS =
-32602
INTERNAL_ERROR =
-32603
READ_CHUNK =
64 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, socket_path) ⇒ ControlServer

Returns a new instance of ControlServer.



37
38
39
40
41
42
43
44
45
# File 'lib/muxr/control_server.rb', line 37

def initialize(app, socket_path)
  @app = app
  @socket_path = socket_path
  @server = nil
  @clients = {}        # io => { read_buffer:, write_buffer: }
  @subscriptions = {}  # io => Set[pane_id]  (populated in step 3)
  @pending_runs  = []  # in-flight pane.run waiters (populated in step 3)
  @dispatcher = Dispatcher.new(app, self)
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



47
48
49
# File 'lib/muxr/control_server.rb', line 47

def app
  @app
end

#socket_pathObject (readonly)

Returns the value of attribute socket_path.



47
48
49
# File 'lib/muxr/control_server.rb', line 47

def socket_path
  @socket_path
end

Instance Method Details

#accept_clientObject

—– internals —–



173
174
175
176
177
178
# File 'lib/muxr/control_server.rb', line 173

def accept_client
  sock = @server.accept
  @clients[sock] = { read_buffer: +"", write_buffer: +"".b }
rescue StandardError
  # Accept may transiently fail on a closed peer; ignore.
end

#add_subscription(client_io, pane_id) ⇒ Object



158
159
160
161
# File 'lib/muxr/control_server.rb', line 158

def add_subscription(client_io, pane_id)
  @subscriptions[client_io] ||= Set.new
  @subscriptions[client_io].add(pane_id.to_s)
end

#complete_run(run, timed_out:, now:) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/muxr/control_server.rb', line 241

def complete_run(run, timed_out:, now:)
  pane = pane_by_id(run[:pane_id])
  unless pane
    respond_error(run[:client_io], id: run[:request_id],
                  code: INVALID_PARAMS,
                  message: "pane #{run[:pane_id]} no longer exists")
    return
  end
  term = pane.terminal
  respond_result(run[:client_io], id: run[:request_id], result: {
    "pane"       => pane.id.to_s,
    "timed_out"  => timed_out,
    "had_output" => run[:had_output],
    "elapsed_ms" => ((now - run[:started_at]) * 1000).round,
    "text"       => term.dump_text,
    "cursor"     => { "row" => term.cursor_row, "col" => term.cursor_col }
  })
end

#consume_client(io) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/muxr/control_server.rb', line 180

def consume_client(io)
  state = @clients[io]
  return unless state
  begin
    chunk = io.read_nonblock(READ_CHUNK)
  rescue IO::WaitReadable
    return
  rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, IOError
    drop_client(io)
    return
  end
  state[:read_buffer] << chunk
  loop do
    nl = state[:read_buffer].index("\n")
    break unless nl
    line = state[:read_buffer].slice!(0..nl)
    line.chomp!
    next if line.empty?
    process_message(io, line)
  end
end

#drain_client(io) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/muxr/control_server.rb', line 202

def drain_client(io)
  state = @clients[io]
  return unless state
  buf = state[:write_buffer]
  return if buf.empty?
  loop do
    n = io.write_nonblock(buf)
    if n >= buf.bytesize
      buf.clear
      break
    else
      # write_nonblock returns the count actually written; slice the rest.
      state[:write_buffer] = buf.byteslice(n..-1) || +"".b
      buf = state[:write_buffer]
    end
  end
rescue IO::WaitWritable
  # Kernel send buffer full; remainder stays queued.
rescue Errno::EPIPE, Errno::ECONNRESET, IOError
  drop_client(io)
end

#drop_client(io) ⇒ Object



224
225
226
227
228
229
230
231
# File 'lib/muxr/control_server.rb', line 224

def drop_client(io)
  @clients.delete(io)
  @subscriptions.delete(io)
  # Any pane.run waiters owned by this client are silently abandoned —
  # there's nobody to respond to.
  @pending_runs.reject! { |r| r[:client_io] == io } unless @pending_runs.empty?
  io.close rescue nil
end

#emit_event(io, method, params) ⇒ Object



298
299
300
# File 'lib/muxr/control_server.rb', line 298

def emit_event(io, method, params)
  write_json(io, { "method" => method, "params" => params })
end

#handle_read(io) ⇒ Object



85
86
87
88
89
90
91
# File 'lib/muxr/control_server.rb', line 85

def handle_read(io)
  if io == @server
    accept_client
  else
    consume_client(io)
  end
end

#handle_write(io) ⇒ Object



93
94
95
# File 'lib/muxr/control_server.rb', line 93

def handle_write(io)
  drain_client(io)
end

#monotonic_nowObject



233
234
235
# File 'lib/muxr/control_server.rb', line 233

def monotonic_now
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#on_pane_output(pane_id, _data) ⇒ Object

Called whenever a pane’s PTY emits output. Drives pane.run idle detection and pane.subscribe streams.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/muxr/control_server.rb', line 101

def on_pane_output(pane_id, _data)
  pid = pane_id.to_s
  now = monotonic_now
  unless @pending_runs.empty?
    @pending_runs.each do |run|
      next unless run[:pane_id] == pid
      run[:last_output_at] = now
      run[:had_output] = true
    end
  end
  unless @subscriptions.empty?
    pane = pane_by_id(pid)
    return unless pane
    text = pane.terminal.dump_text
    cursor = { "row" => pane.terminal.cursor_row, "col" => pane.terminal.cursor_col }
    @subscriptions.each do |io, ids|
      next unless ids.include?(pid)
      emit_event(io, "event.pane.output", { "pane" => pid, "text" => text, "cursor" => cursor })
    end
  end
end

#owns?(io) ⇒ Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/muxr/control_server.rb', line 81

def owns?(io)
  io == @server || @clients.key?(io)
end

#pane_by_id(id) ⇒ Object



237
238
239
# File 'lib/muxr/control_server.rb', line 237

def pane_by_id(id)
  @app.session.window.panes.find { |p| p.id.to_s == id.to_s }
end

#process_message(io, line) ⇒ Object



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/muxr/control_server.rb', line 260

def process_message(io, line)
  msg = JSON.parse(line)
rescue JSON::ParserError => e
  respond_error(io, id: nil, code: PARSE_ERROR, message: "Parse error: #{e.message}")
  return
else
  id = msg["id"]
  method = msg["method"]
  params = msg["params"] || {}
  unless method.is_a?(String)
    respond_error(io, id: id, code: INVALID_REQUEST, message: "missing method")
    return
  end
  begin
    # The dispatcher may either return a Hash (synchronous result) or the
    # symbol :deferred (it will later push a response when ready — used
    # by pane.run / pane.subscribe in step 3).
    result = @dispatcher.call(method: method, params: params, client_io: io, request_id: id)
    respond_result(io, id: id, result: result) if id && result != :deferred
  rescue Dispatcher::Error => e
    respond_error(io, id: id, code: e.code, message: e.message)
  rescue StandardError => e
    respond_error(io, id: id, code: INTERNAL_ERROR, message: "#{e.class}: #{e.message}")
  end
end

#read_iosObject

IO arrays the Application splices into its IO.select read/write sets.



68
69
70
71
72
73
# File 'lib/muxr/control_server.rb', line 68

def read_ios
  return [] unless @server
  ios = [@server]
  ios.concat(@clients.keys)
  ios
end

#register_pending_run(client_io:, request_id:, pane_id:, idle_seconds:, timeout_seconds:) ⇒ Object

Called from the Dispatcher when a pane.run request arrives. Registers a waiter that #tick later resolves.



143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/muxr/control_server.rb', line 143

def register_pending_run(client_io:, request_id:, pane_id:, idle_seconds:, timeout_seconds:)
  now = monotonic_now
  @pending_runs << {
    client_io: client_io,
    request_id: request_id,
    pane_id: pane_id.to_s,
    idle_seconds: idle_seconds,
    timeout_seconds: timeout_seconds,
    last_output_at: now,
    had_output: false,
    deadline_at: now + timeout_seconds,
    started_at: now
  }
end

#remove_subscription(client_io, pane_id) ⇒ Object



163
164
165
166
167
168
169
# File 'lib/muxr/control_server.rb', line 163

def remove_subscription(client_io, pane_id)
  set = @subscriptions[client_io]
  return false unless set
  removed = set.delete?(pane_id.to_s)
  @subscriptions.delete(client_io) if set.empty?
  !removed.nil?
end

#respond_error(io, id:, code:, message:, data: nil) ⇒ Object



292
293
294
295
296
# File 'lib/muxr/control_server.rb', line 292

def respond_error(io, id:, code:, message:, data: nil)
  err = { "code" => code, "message" => message }
  err["data"] = data unless data.nil?
  write_json(io, { "id" => id, "error" => err })
end

#respond_result(io, id:, result:) ⇒ Object



288
289
290
# File 'lib/muxr/control_server.rb', line 288

def respond_result(io, id:, result:)
  write_json(io, { "id" => id, "result" => result })
end

#startObject



49
50
51
52
53
# File 'lib/muxr/control_server.rb', line 49

def start
  File.unlink(@socket_path) if File.exist?(@socket_path)
  @server = UNIXServer.new(@socket_path)
  File.chmod(0o600, @socket_path) rescue nil
end

#stopObject



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/muxr/control_server.rb', line 55

def stop
  @clients.each_key { |c| c.close rescue nil }
  @clients.clear
  @subscriptions.clear
  @pending_runs.clear
  if @server
    @server.close rescue nil
    @server = nil
  end
  File.unlink(@socket_path) if File.exist?(@socket_path)
end

#tickObject

Called once per IO.select tick. Resolves any pane.run waiters whose idle window has elapsed or whose timeout has fired.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/muxr/control_server.rb', line 125

def tick
  return if @pending_runs.empty?
  now = monotonic_now
  completed = []
  @pending_runs.each do |run|
    if now >= run[:deadline_at]
      complete_run(run, timed_out: true, now: now)
      completed << run
    elsif run[:had_output] && (now - run[:last_output_at]) >= run[:idle_seconds]
      complete_run(run, timed_out: false, now: now)
      completed << run
    end
  end
  @pending_runs -= completed unless completed.empty?
end

#write_iosObject



75
76
77
78
79
# File 'lib/muxr/control_server.rb', line 75

def write_ios
  @clients.each_with_object([]) do |(io, state), acc|
    acc << io unless state[:write_buffer].empty?
  end
end

#write_json(io, hash) ⇒ Object



302
303
304
305
306
307
# File 'lib/muxr/control_server.rb', line 302

def write_json(io, hash)
  return unless @clients.key?(io)
  line = JSON.generate(hash) + "\n"
  @clients[io][:write_buffer] << line.b
  drain_client(io)
end