Class: Legate::Mcp::Connection::Stdio
- Inherits:
-
Object
- Object
- Legate::Mcp::Connection::Stdio
- Defined in:
- lib/legate/mcp/connection/stdio.rb
Overview
Manages a connection to an MCP server via STDIO.
Constant Summary collapse
- PROCESS_START_TIMEOUT =
How long to wait for process startup or initial output
5- READ_TIMEOUT =
How long to wait when reading a response line
10- PARSE_ERROR_THRESHOLD =
Max consecutive JSON parse errors before considering connection broken
5
Instance Attribute Summary collapse
-
#args ⇒ Object
readonly
Returns the value of attribute args.
-
#command ⇒ Object
readonly
Returns the value of attribute command.
-
#last_error ⇒ Object
readonly
Returns the value of attribute last_error.
Instance Method Summary collapse
-
#connect ⇒ Object
Connects to the MCP server process.
- #connected? ⇒ Boolean
-
#disconnect(force: false, timeout: 5) ⇒ Object
Disconnects from the server process.
-
#initialize(command:, args: []) ⇒ Stdio
constructor
A new instance of Stdio.
-
#next_request_id ⇒ Integer
Generates the next unique request ID.
-
#read_message(timeout = READ_TIMEOUT) ⇒ Hash?
Reads the next available response or notification.
-
#send_request(json_rpc_hash) ⇒ Object
Sends a JSON-RPC request object to the server process.
Constructor Details
#initialize(command:, args: []) ⇒ Stdio
Returns a new instance of Stdio.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/legate/mcp/connection/stdio.rb', line 22 def initialize(command:, args: []) @command = command @args = args @stdin = nil @stdout = nil @stderr = nil @wait_thr = nil @stderr_thread = nil @connected = false @request_id_counter = 0 @response_queue = Queue.new # <-- Back to response_queue @notification_queue = Queue.new # <-- Back to notification_queue @stdout_reader_thread = nil @last_error = nil @pid = nil @consecutive_parse_errors = 0 # Initialize counter end |
Instance Attribute Details
#args ⇒ Object (readonly)
Returns the value of attribute args.
20 21 22 |
# File 'lib/legate/mcp/connection/stdio.rb', line 20 def args @args end |
#command ⇒ Object (readonly)
Returns the value of attribute command.
20 21 22 |
# File 'lib/legate/mcp/connection/stdio.rb', line 20 def command @command end |
#last_error ⇒ Object (readonly)
Returns the value of attribute last_error.
20 21 22 |
# File 'lib/legate/mcp/connection/stdio.rb', line 20 def last_error @last_error end |
Instance Method Details
#connect ⇒ Object
Connects to the MCP server process. Launches the command and starts threads to monitor stdout/stderr.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/legate/mcp/connection/stdio.rb', line 47 def connect return true if connected? Mcp.logger.info("Connecting via STDIO: #{@command} #{@args.join(' ')}") @last_error = nil stderr_pipe_read, stderr_pipe_write = IO.pipe begin # Use popen3 to capture stdin, stdout, stderr, and wait_thr @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(@command, *@args, err: stderr_pipe_write) @pid = @wait_thr.pid stderr_pipe_write.close # Close the write end in the parent Mcp.logger.debug("MCP process started with PID: #{@pid}") # Thread to read stderr and log/store errors @stderr_thread = Thread.new do stderr_pipe_read.each_line do |line| Mcp.logger.error("[MCP Server STDERR] #{line.chomp}") @last_error = line.chomp # Store last error line end rescue IOError => e Mcp.logger.debug("Stderr pipe closed: #{e.}") ensure stderr_pipe_read.close unless stderr_pipe_read.closed? end # Thread to continuously read stdout and parse JSON-RPC messages @stdout_reader_thread = Thread.new do Mcp.logger.debug("[Stdio Connection #{@pid}] stdout_reader_thread starting...") begin @stdout.each_line do |line| # Handle potential encoding issues from subprocess output line.force_encoding('UTF-8') line.scrub!('') # Remove invalid bytes line.strip! # Remove leading/trailing whitespace next if line.empty? # Skip empty lines Legate.logger.debug("<- [MCP Server STDOUT Raw] #{line}") # Attempt to parse only if it looks like JSON if line.start_with?('{') || line.start_with?('[') begin = JSON.parse(line, symbolize_names: true) Legate.logger.debug("[Stdio Connection #{@pid}] Received Parsed JSON:\n#{JSON.pretty_generate()}") @consecutive_parse_errors = 0 # Reset on successful parse # Route to correct queue based on ID presence/value if .key?(:id) && [:id].nil? # MCP Notifications might have null id @notification_queue << elsif .key?(:id) Legate.logger.debug("[Stdio Connection #{@pid}] Queuing response ID: #{[:id]}") @response_queue << # Responses have non-null id else # Assume notification for now @notification_queue << Legate.logger.warn("Received MCP message without explicit id: #{.inspect}") end rescue JSON::ParserError => e Legate.logger.error("Failed to parse potential MCP JSON from stdout: #{e.}. Line: #{line}") @consecutive_parse_errors += 1 if @consecutive_parse_errors >= PARSE_ERROR_THRESHOLD # Use >= for clarity Legate.logger.fatal("Too many consecutive JSON parse errors (#{PARSE_ERROR_THRESHOLD} reached). Assuming MCP connection broken.") @connected = false # Mark connection as broken @last_error = 'Too many consecutive JSON parse errors.' break # Stop reading from stdout end end else # Log lines that don't look like JSON instead of trying to parse Legate.logger.debug("Skipping non-JSON line from MCP STDOUT: #{line}") # Do not increment parse error count for these lines end end Legate.logger.info('MCP Server stdout stream ended.') rescue IOError => e Legate.logger.info("MCP Server stdout pipe closed: #{e.}") ensure @connected = false # Mark as disconnected if stdout closes or loop breaks due to errors Mcp.logger.debug("[Stdio Connection #{@pid}] stdout_reader_thread finished.") end end @connected = true Mcp.logger.info('MCP STDIO connection established.') true rescue Errno::ENOENT => e @last_error = "Command not found: #{@command}" Mcp.logger.error("#{@last_error} - #{e.}") raise ConnectionError, @last_error rescue StandardError => e @last_error = "Failed to start MCP process: #{e.}" Mcp.logger.error("#{@last_error}") # Clean up if process started partially disconnect(force: true) raise ConnectionError, @last_error end end |
#connected? ⇒ Boolean
40 41 42 |
# File 'lib/legate/mcp/connection/stdio.rb', line 40 def connected? @connected && @wait_thr&.alive? end |
#disconnect(force: false, timeout: 5) ⇒ Object
Disconnects from the server process. Terminates the process and cleans up threads.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/legate/mcp/connection/stdio.rb', line 213 def disconnect(force: false, timeout: 5) return unless @connected || @wait_thr # Only proceed if we have something to disconnect Mcp.logger.info("Disconnecting from MCP STDIO process (PID: #{@pid})...") @connected = false # Close stdin to signal EOF to the process @stdin&.close unless @stdin&.closed? # Close stdout/stderr BEFORE killing reader threads so they # unblock from IO.read and can exit cleanly @stdout&.close unless @stdout&.closed? @stderr&.close unless @stderr&.closed? # Join reader threads with timeout, then force-kill if stuck [@stdout_reader_thread, @stderr_thread].each do |thr| next unless thr&.alive? thr.join(2) || thr.kill end # Terminate the child process if @wait_thr&.pid pid = @wait_thr.pid begin Mcp.logger.debug("Sending SIGTERM to PID #{pid}...") Process.kill('TERM', pid) process_exited = @wait_thr.join(timeout) unless process_exited Mcp.logger.warn("MCP process PID #{pid} did not exit after SIGTERM and #{timeout}s timeout.") if force Mcp.logger.warn("Forcing shutdown with SIGKILL for PID #{pid}.") Process.kill('KILL', pid) @wait_thr.join(2) end end Mcp.logger.info("MCP process PID #{pid} terminated. Status: #{@wait_thr.value}") rescue Errno::ESRCH Mcp.logger.info("MCP process PID #{pid} already exited.") rescue StandardError => e Mcp.logger.debug("Caught StandardError during termination: #{e.class}") Mcp.logger.error("Error during process termination for PID #{pid}: #{e.}") end end @stdin = @stdout = @stderr = @wait_thr = @pid = nil @response_queue.clear @notification_queue.clear Mcp.logger.info('MCP STDIO connection closed.') end |
#next_request_id ⇒ Integer
Generates the next unique request ID.
267 268 269 |
# File 'lib/legate/mcp/connection/stdio.rb', line 267 def next_request_id @request_id_counter += 1 end |
#read_message(timeout = READ_TIMEOUT) ⇒ Hash?
Reads the next available response or notification. This is a low-level method; typically use Client methods which match request/response.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/legate/mcp/connection/stdio.rb', line 174 def (timeout = READ_TIMEOUT) raise ConnectionError, 'Not connected' unless connected? # Check both queues, prioritize responses if available begin @response_queue.pop(true) # non_block = true rescue ThreadError # Response queue empty, check notifications begin @notification_queue.pop(true) rescue ThreadError # Both empty, wait with timeout if specified return nil if timeout == 0 # Don't wait if timeout is 0 deadline = timeout ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout : nil loop do remaining = deadline ? deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) : 1.0 return nil if remaining <= 0 wait = [remaining, 0.5].min msg = @response_queue.pop(timeout: wait) return msg if msg begin return @notification_queue.pop(true) rescue ThreadError # notification queue empty too end raise ConnectionError, 'Connection lost while waiting for message' unless connected? end end end end |
#send_request(json_rpc_hash) ⇒ Object
Sends a JSON-RPC request object to the server process.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/legate/mcp/connection/stdio.rb', line 148 def send_request(json_rpc_hash) raise ConnectionError, 'Not connected' unless connected? begin request_json = json_rpc_hash.to_json Mcp.logger.debug("-> [MCP Client STDIN] #{request_json}") @stdin.puts(request_json) @stdin.flush # Ensure data is sent immediately rescue Errno::EPIPE => e @connected = false @last_error = "MCP process stdin pipe broke: #{e.}" Mcp.logger.error(@last_error) raise ConnectionError, @last_error rescue StandardError => e @connected = false @last_error = "Error writing to MCP process stdin: #{e.class} - #{e.}" Mcp.logger.error("#{@last_error}\n#{e.backtrace.join("\n")}") raise ConnectionError, @last_error end end |