Class: Clacky::Mcp::StdioTransport

Inherits:
Transport
  • Object
show all
Defined in:
lib/clacky/mcp/stdio_transport.rb

Instance Method Summary collapse

Constructor Details

#initialize(name:, command:, args: [], env: {}, cwd: nil) ⇒ StdioTransport

Returns a new instance of StdioTransport.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/clacky/mcp/stdio_transport.rb', line 12

def initialize(name:, command:, args: [], env: {}, cwd: nil)
  @name    = name
  @command = command
  @args    = Array(args)
  @env     = env || {}
  @cwd     = cwd

  @stdin = @stdout = @stderr = nil
  @wait_thr = nil
  @reader_thr = nil
  @on_message = nil
  @lock = Monitor.new
  @stderr_buf = String.new
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/clacky/mcp/stdio_transport.rb', line 72

def alive?
  !!(@wait_thr && @wait_thr.alive?)
end

#on_message(&blk) ⇒ Object



86
87
88
# File 'lib/clacky/mcp/stdio_transport.rb', line 86

def on_message(&blk)
  @on_message = blk
end

#send_message(payload) ⇒ Object



76
77
78
79
80
81
82
83
84
# File 'lib/clacky/mcp/stdio_transport.rb', line 76

def send_message(payload)
  line = JSON.generate(payload) + "\n"
  @lock.synchronize do
    raise TransportError, "MCP server '#{@name}' stdin closed" if @stdin.nil? || @stdin.closed?
    @stdin.write(line)
  end
rescue Errno::EPIPE => e
  raise TransportError, "MCP server '#{@name}' stdin pipe broken: #{e.message}"
end

#startObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/clacky/mcp/stdio_transport.rb', line 27

def start
  full_env = ENV.to_h.merge(@env.transform_keys(&:to_s).transform_values(&:to_s))
  opts = { unsetenv_others: false }
  opts[:chdir] = @cwd if @cwd && File.directory?(@cwd)

  @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(full_env, @command, *@args, opts)
  @stdin.sync = true

  Thread.new do
    @stderr.each_line do |line|
      @lock.synchronize do
        @stderr_buf << line
        @stderr_buf.replace(@stderr_buf[-32_768, 32_768] || @stderr_buf) if @stderr_buf.bytesize > 65_536
      end
    end
  rescue IOError
  end

  start_reader
  self
rescue Errno::ENOENT => e
  raise TransportError, "MCP server '#{@name}' command not found: #{@command} (#{e.message})"
end

#stderr_tail(bytes: 4096) ⇒ Object



90
91
92
# File 'lib/clacky/mcp/stdio_transport.rb', line 90

def stderr_tail(bytes: 4096)
  @lock.synchronize { @stderr_buf[-bytes, bytes] || @stderr_buf.dup }
end

#stopObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/clacky/mcp/stdio_transport.rb', line 51

def stop
  @lock.synchronize do
    return unless @wait_thr&.alive?
    begin
      Process.kill("TERM", @wait_thr.pid)
    rescue Errno::ESRCH, Errno::EPERM
    end
    deadline = Time.now + 2
    sleep 0.05 while @wait_thr.alive? && Time.now < deadline
    if @wait_thr.alive?
      begin
        Process.kill("KILL", @wait_thr.pid)
      rescue Errno::ESRCH, Errno::EPERM
      end
    end
  ensure
    [@stdin, @stdout, @stderr].each { |io| io&.close rescue nil }
    @reader_thr&.kill rescue nil
  end
end