Class: RobotLab::MCP::ConnectionPoller Private
- Inherits:
-
Object
- Object
- RobotLab::MCP::ConnectionPoller
- Defined in:
- lib/robot_lab/mcp/connection_poller.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Multiplexes I/O across multiple stdio MCP transports.
When a robot connects to more than one local (stdio) MCP server, the default approach blocks independently per client, each with its own Timeout.timeout wrapper. ConnectionPoller replaces this with a single IO.select call across all registered stdout file descriptors, dispatching each response to the pending request for that client.
Async-based transports (SSE, WebSocket, StreamableHTTP) are unaffected — they already use the Async fiber scheduler.
Usage
poller = MCP::ConnectionPoller.new.start
client = MCP::Client.new(server_config, poller: poller)
client.connect # registers transport with poller
client.call_tool(…)
poller.stop
Constant Summary collapse
- POLL_INTERVAL =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
seconds between IO.select checks
0.1
Instance Method Summary collapse
-
#initialize ⇒ ConnectionPoller
constructor
private
Creates a new ConnectionPoller.
-
#register(client) ⇒ void
private
Register an MCP client with the poller.
-
#running? ⇒ Boolean
private
Whether the poller thread is running.
-
#send_request(client, message, timeout:) ⇒ Hash
private
Send a JSON-RPC request via the poller and block until the response.
-
#start ⇒ self
private
Start the multiplexing thread.
-
#stop(timeout: 5) ⇒ self
private
Stop the multiplexing thread.
-
#unregister(client) ⇒ void
private
Unregister a client.
Constructor Details
#initialize ⇒ ConnectionPoller
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates a new ConnectionPoller.
29 30 31 32 33 34 |
# File 'lib/robot_lab/mcp/connection_poller.rb', line 29 def initialize @mutex = Mutex.new @clients = {} # IO => { client:, queue: Thread::Queue } @running = false @thread = nil end |
Instance Method Details
#register(client) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Register an MCP client with the poller.
Only stdio clients are registered — others are silently ignored.
79 80 81 82 83 84 |
# File 'lib/robot_lab/mcp/connection_poller.rb', line 79 def register(client) return unless stdio_client?(client) io = client.transport.stdout @mutex.synchronize { @clients[io] = { client: client, queue: nil } } end |
#running? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Whether the poller thread is running.
138 139 140 |
# File 'lib/robot_lab/mcp/connection_poller.rb', line 138 def running? @mutex.synchronize { @running } end |
#send_request(client, message, timeout:) ⇒ Hash
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Send a JSON-RPC request via the poller and block until the response.
Writes to the client’s stdin, registers a response queue, then blocks until the poll loop dispatches the response.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/robot_lab/mcp/connection_poller.rb', line 107 def send_request(client, , timeout:) io = client.transport.stdout queue = Thread::Queue.new @mutex.synchronize { @clients[io][:queue] = queue } begin client.transport.stdin.puts(.to_json) client.transport.stdin.flush rescue Errno::EPIPE, IOError => e @mutex.synchronize { @clients[io][:queue] = nil } raise MCPError, "MCP connection lost: #{e.}" end response = Timeout.timeout(timeout) { queue.pop } if response.is_a?(Hash) && response[:error] raise MCPError, response[:error] end response rescue Timeout::Error @mutex.synchronize { @clients[io]&.[]= :queue, nil } raise MCPError, "MCP server did not respond within #{timeout}s" ensure @mutex.synchronize { @clients[io]&.[]= :queue, nil } end |
#start ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Start the multiplexing thread.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/robot_lab/mcp/connection_poller.rb', line 39 def start @mutex.synchronize do return self if @running @running = true @thread = Thread.new { poll_loop } @thread.name = "RobotLab::MCP::ConnectionPoller" end self end |
#stop(timeout: 5) ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stop the multiplexing thread.
Cancels all pending requests with an MCPError.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/robot_lab/mcp/connection_poller.rb', line 56 def stop(timeout: 5) @mutex.synchronize do return self unless @running @running = false # Unblock any pending request queues @clients.each_value do |entry| entry[:queue]&.push({ error: "ConnectionPoller stopped" }) end end @thread&.join(timeout) @thread = nil self end |
#unregister(client) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Unregister a client.
90 91 92 93 94 95 |
# File 'lib/robot_lab/mcp/connection_poller.rb', line 90 def unregister(client) return unless stdio_client?(client) io = client.transport.stdout @mutex.synchronize { @clients.delete(io) } end |