Class: Clacky::Mcp::StdioTransport
- Defined in:
- lib/clacky/mcp/stdio_transport.rb
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#initialize(name:, command:, args: [], env: {}, cwd: nil) ⇒ StdioTransport
constructor
A new instance of StdioTransport.
- #on_message(&blk) ⇒ Object
- #send_message(payload) ⇒ Object
- #start ⇒ Object
- #stderr_tail(bytes: 4096) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(name:, command:, args: [], env: {}, cwd: nil) ⇒ StdioTransport
Returns a new instance of StdioTransport.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/clacky/mcp/stdio_transport.rb', line 12 def initialize(name:, command:, args: [], env: {}, cwd: nil) @name = name @command = command @args = Array(args) @env = env || {} @cwd = cwd @stdin = @stdout = @stderr = nil @wait_thr = nil @reader_thr = nil @on_message = nil @lock = Monitor.new @stderr_buf = String.new end |
Instance Method Details
#alive? ⇒ Boolean
72 73 74 |
# File 'lib/clacky/mcp/stdio_transport.rb', line 72 def alive? !!(@wait_thr && @wait_thr.alive?) end |
#on_message(&blk) ⇒ Object
86 87 88 |
# File 'lib/clacky/mcp/stdio_transport.rb', line 86 def (&blk) @on_message = blk end |
#send_message(payload) ⇒ Object
76 77 78 79 80 81 82 83 84 |
# File 'lib/clacky/mcp/stdio_transport.rb', line 76 def (payload) line = JSON.generate(payload) + "\n" @lock.synchronize do raise TransportError, "MCP server '#{@name}' stdin closed" if @stdin.nil? || @stdin.closed? @stdin.write(line) end rescue Errno::EPIPE => e raise TransportError, "MCP server '#{@name}' stdin pipe broken: #{e.}" end |
#start ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/clacky/mcp/stdio_transport.rb', line 27 def start full_env = ENV.to_h.merge(@env.transform_keys(&:to_s).transform_values(&:to_s)) opts = { unsetenv_others: false } opts[:chdir] = @cwd if @cwd && File.directory?(@cwd) @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(full_env, @command, *@args, opts) @stdin.sync = true Thread.new do @stderr.each_line do |line| @lock.synchronize do @stderr_buf << line @stderr_buf.replace(@stderr_buf[-32_768, 32_768] || @stderr_buf) if @stderr_buf.bytesize > 65_536 end end rescue IOError end start_reader self rescue Errno::ENOENT => e raise TransportError, "MCP server '#{@name}' command not found: #{@command} (#{e.})" end |
#stderr_tail(bytes: 4096) ⇒ Object
90 91 92 |
# File 'lib/clacky/mcp/stdio_transport.rb', line 90 def stderr_tail(bytes: 4096) @lock.synchronize { @stderr_buf[-bytes, bytes] || @stderr_buf.dup } end |
#stop ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/clacky/mcp/stdio_transport.rb', line 51 def stop @lock.synchronize do return unless @wait_thr&.alive? begin Process.kill("TERM", @wait_thr.pid) rescue Errno::ESRCH, Errno::EPERM end deadline = Time.now + 2 sleep 0.05 while @wait_thr.alive? && Time.now < deadline if @wait_thr.alive? begin Process.kill("KILL", @wait_thr.pid) rescue Errno::ESRCH, Errno::EPERM end end ensure [@stdin, @stdout, @stderr].each { |io| io&.close rescue nil } @reader_thr&.kill rescue nil end end |