Class: RobotLab::MCP::ConnectionPoller Private

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/mcp/connection_poller.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Multiplexes I/O across multiple stdio MCP transports.

When a robot connects to more than one local (stdio) MCP server, the default approach blocks independently per client, each with its own Timeout.timeout wrapper. ConnectionPoller replaces this with a single IO.select call across all registered stdout file descriptors, dispatching each response to the pending request for that client.

Async-based transports (SSE, WebSocket, StreamableHTTP) are unaffected — they already use the Async fiber scheduler.

Usage

poller = MCP::ConnectionPoller.new.start
client = MCP::Client.new(server_config, poller: poller)
client.connect     # registers transport with poller
client.call_tool()
poller.stop

Constant Summary collapse

POLL_INTERVAL =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

seconds between IO.select checks

0.1

Instance Method Summary collapse

Constructor Details

#initializeConnectionPoller

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates a new ConnectionPoller.



29
30
31
32
33
34
# File 'lib/robot_lab/mcp/connection_poller.rb', line 29

def initialize
  @mutex    = Mutex.new
  @clients  = {}    # IO => { client:, queue: Thread::Queue }
  @running  = false
  @thread   = nil
end

Instance Method Details

#register(client) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Register an MCP client with the poller.

Only stdio clients are registered — others are silently ignored.

Parameters:



79
80
81
82
83
84
# File 'lib/robot_lab/mcp/connection_poller.rb', line 79

def register(client)
  return unless stdio_client?(client)

  io = client.transport.stdout
  @mutex.synchronize { @clients[io] = { client: client, queue: nil } }
end

#running?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether the poller thread is running.

Returns:

  • (Boolean)


138
139
140
# File 'lib/robot_lab/mcp/connection_poller.rb', line 138

def running?
  @mutex.synchronize { @running }
end

#send_request(client, message, timeout:) ⇒ Hash

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Send a JSON-RPC request via the poller and block until the response.

Writes to the client’s stdin, registers a response queue, then blocks until the poll loop dispatches the response.

Parameters:

  • client (MCP::Client)
  • message (Hash)

    JSON-RPC message

  • timeout (Numeric)

    seconds before raising MCPError

Returns:

  • (Hash)

    parsed response

Raises:

  • (MCPError)

    on timeout or connection error



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/robot_lab/mcp/connection_poller.rb', line 107

def send_request(client, message, timeout:)
  io      = client.transport.stdout
  queue   = Thread::Queue.new

  @mutex.synchronize { @clients[io][:queue] = queue }

  begin
    client.transport.stdin.puts(message.to_json)
    client.transport.stdin.flush
  rescue Errno::EPIPE, IOError => e
    @mutex.synchronize { @clients[io][:queue] = nil }
    raise MCPError, "MCP connection lost: #{e.message}"
  end

  response = Timeout.timeout(timeout) { queue.pop }

  if response.is_a?(Hash) && response[:error]
    raise MCPError, response[:error]
  end

  response
rescue Timeout::Error
  @mutex.synchronize { @clients[io]&.[]= :queue, nil }
  raise MCPError, "MCP server did not respond within #{timeout}s"
ensure
  @mutex.synchronize { @clients[io]&.[]= :queue, nil }
end

#startself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Start the multiplexing thread.

Returns:

  • (self)


39
40
41
42
43
44
45
46
47
48
# File 'lib/robot_lab/mcp/connection_poller.rb', line 39

def start
  @mutex.synchronize do
    return self if @running

    @running = true
    @thread  = Thread.new { poll_loop }
    @thread.name = "RobotLab::MCP::ConnectionPoller"
  end
  self
end

#stop(timeout: 5) ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stop the multiplexing thread.

Cancels all pending requests with an MCPError.

Parameters:

  • timeout (Numeric) (defaults to: 5)

    seconds to wait for thread to finish

Returns:

  • (self)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/robot_lab/mcp/connection_poller.rb', line 56

def stop(timeout: 5)
  @mutex.synchronize do
    return self unless @running

    @running = false

    # Unblock any pending request queues
    @clients.each_value do |entry|
      entry[:queue]&.push({ error: "ConnectionPoller stopped" })
    end
  end

  @thread&.join(timeout)
  @thread = nil
  self
end

#unregister(client) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Unregister a client.

Parameters:



90
91
92
93
94
95
# File 'lib/robot_lab/mcp/connection_poller.rb', line 90

def unregister(client)
  return unless stdio_client?(client)

  io = client.transport.stdout
  @mutex.synchronize { @clients.delete(io) }
end