Class: Legate::Mcp::Connection::Stdio

Inherits:
Object
  • Object
show all
Defined in:
lib/legate/mcp/connection/stdio.rb

Overview

Manages a connection to an MCP server via STDIO.

Constant Summary collapse

PROCESS_START_TIMEOUT =

How long to wait for process startup or initial output

5
READ_TIMEOUT =

How long to wait when reading a response line

10
PARSE_ERROR_THRESHOLD =

Max consecutive JSON parse errors before considering connection broken

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(command:, args: []) ⇒ Stdio

Returns a new instance of Stdio.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/legate/mcp/connection/stdio.rb', line 22

def initialize(command:, args: [])
  @command = command
  @args = args
  @stdin = nil
  @stdout = nil
  @stderr = nil
  @wait_thr = nil
  @stderr_thread = nil
  @connected = false
  @request_id_counter = 0
  @response_queue = Queue.new # <-- Back to response_queue
  @notification_queue = Queue.new # <-- Back to notification_queue
  @stdout_reader_thread = nil
  @last_error = nil
  @pid = nil
  @consecutive_parse_errors = 0 # Initialize counter
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



20
21
22
# File 'lib/legate/mcp/connection/stdio.rb', line 20

def args
  @args
end

#commandObject (readonly)

Returns the value of attribute command.



20
21
22
# File 'lib/legate/mcp/connection/stdio.rb', line 20

def command
  @command
end

#last_errorObject (readonly)

Returns the value of attribute last_error.



20
21
22
# File 'lib/legate/mcp/connection/stdio.rb', line 20

def last_error
  @last_error
end

Instance Method Details

#connectObject

Connects to the MCP server process. Launches the command and starts threads to monitor stdout/stderr.

Raises:

  • (ConnectionError)

    if the process fails to start or terminates unexpectedly.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/legate/mcp/connection/stdio.rb', line 47

def connect
  return true if connected?

  Mcp.logger.info("Connecting via STDIO: #{@command} #{@args.join(' ')}")
  @last_error = nil
  stderr_pipe_read, stderr_pipe_write = IO.pipe

  begin
    # Use popen3 to capture stdin, stdout, stderr, and wait_thr
    @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(@command, *@args, err: stderr_pipe_write)
    @pid = @wait_thr.pid
    stderr_pipe_write.close # Close the write end in the parent

    Mcp.logger.debug("MCP process started with PID: #{@pid}")

    # Thread to read stderr and log/store errors
    @stderr_thread = Thread.new do
      stderr_pipe_read.each_line do |line|
        Mcp.logger.error("[MCP Server STDERR] #{line.chomp}")
        @last_error = line.chomp # Store last error line
      end
    rescue IOError => e
      Mcp.logger.debug("Stderr pipe closed: #{e.message}")
    ensure
      stderr_pipe_read.close unless stderr_pipe_read.closed?
    end

    # Thread to continuously read stdout and parse JSON-RPC messages
    @stdout_reader_thread = Thread.new do
      Mcp.logger.debug("[Stdio Connection #{@pid}] stdout_reader_thread starting...")
      begin
        @stdout.each_line do |line|
          # Handle potential encoding issues from subprocess output
          line.force_encoding('UTF-8')
          line.scrub!('') # Remove invalid bytes
          line.strip! # Remove leading/trailing whitespace
          next if line.empty? # Skip empty lines

          Legate.logger.debug("<- [MCP Server STDOUT Raw] #{line}")

          # Attempt to parse only if it looks like JSON
          if line.start_with?('{') || line.start_with?('[')
            begin
              message = JSON.parse(line, symbolize_names: true)
              Legate.logger.debug("[Stdio Connection #{@pid}] Received Parsed JSON:\n#{JSON.pretty_generate(message)}")
              @consecutive_parse_errors = 0 # Reset on successful parse

              # Route to correct queue based on ID presence/value
              if message.key?(:id) && message[:id].nil? # MCP Notifications might have null id
                @notification_queue << message
              elsif message.key?(:id)
                Legate.logger.debug("[Stdio Connection #{@pid}] Queuing response ID: #{message[:id]}")
                @response_queue << message # Responses have non-null id
              else # Assume notification for now
                @notification_queue << message
                Legate.logger.warn("Received MCP message without explicit id: #{message.inspect}")
              end
            rescue JSON::ParserError => e
              Legate.logger.error("Failed to parse potential MCP JSON from stdout: #{e.message}. Line: #{line}")
              @consecutive_parse_errors += 1
              if @consecutive_parse_errors >= PARSE_ERROR_THRESHOLD # Use >= for clarity
                Legate.logger.fatal("Too many consecutive JSON parse errors (#{PARSE_ERROR_THRESHOLD} reached). Assuming MCP connection broken.")
                @connected = false # Mark connection as broken
                @last_error = 'Too many consecutive JSON parse errors.'
                break # Stop reading from stdout
              end
            end
          else
            # Log lines that don't look like JSON instead of trying to parse
            Legate.logger.debug("Skipping non-JSON line from MCP STDOUT: #{line}")
            # Do not increment parse error count for these lines
          end
        end
        Legate.logger.info('MCP Server stdout stream ended.')
      rescue IOError => e
        Legate.logger.info("MCP Server stdout pipe closed: #{e.message}")
      ensure
        @connected = false # Mark as disconnected if stdout closes or loop breaks due to errors
        Mcp.logger.debug("[Stdio Connection #{@pid}] stdout_reader_thread finished.")
      end
    end

    @connected = true
    Mcp.logger.info('MCP STDIO connection established.')
    true
  rescue Errno::ENOENT => e
    @last_error = "Command not found: #{@command}"
    Mcp.logger.error("#{@last_error} - #{e.message}")
    raise ConnectionError, @last_error
  rescue StandardError => e
    @last_error = "Failed to start MCP process: #{e.message}"
    Mcp.logger.error("#{@last_error}")
    # Clean up if process started partially
    disconnect(force: true)
    raise ConnectionError, @last_error
  end
end

#connected?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/legate/mcp/connection/stdio.rb', line 40

def connected?
  @connected && @wait_thr&.alive?
end

#disconnect(force: false, timeout: 5) ⇒ Object

Disconnects from the server process. Terminates the process and cleans up threads.

Parameters:

  • force (Boolean) (defaults to: false)

    If true, use SIGKILL if SIGTERM fails.

  • timeout (Numeric) (defaults to: 5)

    Seconds to wait for graceful shutdown.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/legate/mcp/connection/stdio.rb', line 213

def disconnect(force: false, timeout: 5)
  return unless @connected || @wait_thr # Only proceed if we have something to disconnect

  Mcp.logger.info("Disconnecting from MCP STDIO process (PID: #{@pid})...")
  @connected = false

  # Close stdin to signal EOF to the process
  @stdin&.close unless @stdin&.closed?

  # Close stdout/stderr BEFORE killing reader threads so they
  # unblock from IO.read and can exit cleanly
  @stdout&.close unless @stdout&.closed?
  @stderr&.close unless @stderr&.closed?

  # Join reader threads with timeout, then force-kill if stuck
  [@stdout_reader_thread, @stderr_thread].each do |thr|
    next unless thr&.alive?

    thr.join(2) || thr.kill
  end

  # Terminate the child process
  if @wait_thr&.pid
    pid = @wait_thr.pid
    begin
      Mcp.logger.debug("Sending SIGTERM to PID #{pid}...")
      Process.kill('TERM', pid)
      process_exited = @wait_thr.join(timeout)

      unless process_exited
        Mcp.logger.warn("MCP process PID #{pid} did not exit after SIGTERM and #{timeout}s timeout.")
        if force
          Mcp.logger.warn("Forcing shutdown with SIGKILL for PID #{pid}.")
          Process.kill('KILL', pid)
          @wait_thr.join(2)
        end
      end
      Mcp.logger.info("MCP process PID #{pid} terminated. Status: #{@wait_thr.value}")
    rescue Errno::ESRCH
      Mcp.logger.info("MCP process PID #{pid} already exited.")
    rescue StandardError => e
      Mcp.logger.debug("Caught StandardError during termination: #{e.class}")
      Mcp.logger.error("Error during process termination for PID #{pid}: #{e.message}")
    end
  end

  @stdin = @stdout = @stderr = @wait_thr = @pid = nil
  @response_queue.clear
  @notification_queue.clear
  Mcp.logger.info('MCP STDIO connection closed.')
end

#next_request_idInteger

Generates the next unique request ID.

Returns:

  • (Integer)


267
268
269
# File 'lib/legate/mcp/connection/stdio.rb', line 267

def next_request_id
  @request_id_counter += 1
end

#read_message(timeout = READ_TIMEOUT) ⇒ Hash?

Reads the next available response or notification. This is a low-level method; typically use Client methods which match request/response.

Parameters:

  • timeout (Numeric, nil) (defaults to: READ_TIMEOUT)

    Seconds to wait for a message, nil to wait indefinitely.

Returns:

  • (Hash, nil)

    The parsed JSON-RPC message, or nil if timeout occurs.

Raises:



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/legate/mcp/connection/stdio.rb', line 174

def read_message(timeout = READ_TIMEOUT)
  raise ConnectionError, 'Not connected' unless connected?

  # Check both queues, prioritize responses if available
  begin
    @response_queue.pop(true) # non_block = true
  rescue ThreadError
    # Response queue empty, check notifications
    begin
      @notification_queue.pop(true)
    rescue ThreadError
      # Both empty, wait with timeout if specified
      return nil if timeout == 0 # Don't wait if timeout is 0

      deadline = timeout ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout : nil
      loop do
        remaining = deadline ? deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) : 1.0
        return nil if remaining <= 0

        wait = [remaining, 0.5].min
        msg = @response_queue.pop(timeout: wait)
        return msg if msg

        begin
          return @notification_queue.pop(true)
        rescue ThreadError
          # notification queue empty too
        end

        raise ConnectionError, 'Connection lost while waiting for message' unless connected?
      end
    end
  end
end

#send_request(json_rpc_hash) ⇒ Object

Sends a JSON-RPC request object to the server process.

Parameters:

  • json_rpc_hash (Hash)

    The request hash (e.g., ‘2.0’, method: ‘…’, params: …, id: …)

Raises:



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/legate/mcp/connection/stdio.rb', line 148

def send_request(json_rpc_hash)
  raise ConnectionError, 'Not connected' unless connected?

  begin
    request_json = json_rpc_hash.to_json
    Mcp.logger.debug("-> [MCP Client STDIN] #{request_json}")
    @stdin.puts(request_json)
    @stdin.flush # Ensure data is sent immediately
  rescue Errno::EPIPE => e
    @connected = false
    @last_error = "MCP process stdin pipe broke: #{e.message}"
    Mcp.logger.error(@last_error)
    raise ConnectionError, @last_error
  rescue StandardError => e
    @connected = false
    @last_error = "Error writing to MCP process stdin: #{e.class} - #{e.message}"
    Mcp.logger.error("#{@last_error}\n#{e.backtrace.join("\n")}")
    raise ConnectionError, @last_error
  end
end