Class: RubynCode::MCP::SSETransport
- Inherits:
-
Object
- Object
- RubynCode::MCP::SSETransport
- Defined in:
- lib/rubyn_code/mcp/sse_transport.rb
Overview
Communicates with a remote MCP server via HTTP Server-Sent Events (SSE).
On #start!, the transport establishes a long-lived GET connection to the SSE endpoint. The server responds with an ‘endpoint` event containing the URL for JSON-RPC POST requests. Subsequent requests are sent via POST, and responses arrive as SSE events on the GET stream.
Defined Under Namespace
Classes: TimeoutError, TransportError
Constant Summary collapse
- DEFAULT_TIMEOUT =
seconds
30
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Checks whether the transport is connected.
-
#initialize(url:, timeout: DEFAULT_TIMEOUT) ⇒ SSETransport
constructor
A new instance of SSETransport.
-
#send_request(method, params = {}) ⇒ Hash
Sends a JSON-RPC 2.0 request via HTTP POST and waits for the response.
-
#start! ⇒ void
Establishes the SSE connection and waits for the endpoint event.
-
#stop! ⇒ void
Closes the SSE connection and cleans up resources.
Constructor Details
#initialize(url:, timeout: DEFAULT_TIMEOUT) ⇒ SSETransport
Returns a new instance of SSETransport.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 26 def initialize(url:, timeout: DEFAULT_TIMEOUT) @url = url @timeout = timeout @request_id = 0 @mutex = Mutex.new @post_endpoint = nil @pending_responses = {} @connected = false @sse_thread = nil end |
Instance Method Details
#alive? ⇒ Boolean
Checks whether the transport is connected.
98 99 100 |
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 98 def alive? @connected && @sse_thread&.alive? end |
#send_request(method, params = {}) ⇒ Hash
Sends a JSON-RPC 2.0 request via HTTP POST and waits for the response.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 66 def send_request(method, params = {}) raise TransportError, 'Transport is not connected' unless @connected id = next_request_id queue = Queue.new @mutex.synchronize { @pending_responses[id] = queue } request = { jsonrpc: '2.0', id: id, method: method, params: params } post_request(request) wait_for_response(id, queue) end |
#start! ⇒ void
This method returns an undefined value.
Establishes the SSE connection and waits for the endpoint event.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 41 def start! raise TransportError, 'Transport already started' if @connected @pending_responses = {} @sse_thread = Thread.new { run_sse_listener } # Wait for the endpoint event with a timeout deadline = Time.now + @timeout sleep(0.1) until @post_endpoint || Time.now > deadline unless @post_endpoint stop! raise TransportError, "MCP server did not provide an endpoint within #{@timeout}s" end @connected = true end |
#stop! ⇒ void
This method returns an undefined value.
Closes the SSE connection and cleans up resources.
87 88 89 90 91 92 93 |
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 87 def stop! @connected = false @sse_thread&.kill @sse_thread = nil @post_endpoint = nil @pending_responses.clear end |