Class: RubynCode::MCP::SSETransport

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyn_code/mcp/sse_transport.rb

Overview

Communicates with a remote MCP server via HTTP Server-Sent Events (SSE).

On #start!, the transport establishes a long-lived GET connection to the SSE endpoint. The server responds with an ‘endpoint` event containing the URL for JSON-RPC POST requests. Subsequent requests are sent via POST, and responses arrive as SSE events on the GET stream.

Defined Under Namespace

Classes: TimeoutError, TransportError

Constant Summary collapse

DEFAULT_TIMEOUT =

seconds

30

Instance Method Summary collapse

Constructor Details

#initialize(url:, timeout: DEFAULT_TIMEOUT) ⇒ SSETransport

Returns a new instance of SSETransport.

Parameters:

  • url (String)

    the SSE endpoint URL of the MCP server

  • timeout (Integer) (defaults to: DEFAULT_TIMEOUT)

    default timeout in seconds per request



26
27
28
29
30
31
32
33
34
35
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 26

def initialize(url:, timeout: DEFAULT_TIMEOUT)
  @url = url
  @timeout = timeout
  @request_id = 0
  @mutex = Mutex.new
  @post_endpoint = nil
  @pending_responses = {}
  @connected = false
  @sse_thread = nil
end

Instance Method Details

#alive?Boolean

Checks whether the transport is connected.

Returns:

  • (Boolean)


98
99
100
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 98

def alive?
  @connected && @sse_thread&.alive?
end

#send_request(method, params = {}) ⇒ Hash

Sends a JSON-RPC 2.0 request via HTTP POST and waits for the response.

Parameters:

  • method (String)

    the JSON-RPC method name

  • params (Hash) (defaults to: {})

    parameters for the request

Returns:

  • (Hash)

    the parsed JSON-RPC result

Raises:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 66

def send_request(method, params = {})
  raise TransportError, 'Transport is not connected' unless @connected

  id = next_request_id
  queue = Queue.new
  @mutex.synchronize { @pending_responses[id] = queue }

  request = {
    jsonrpc: '2.0',
    id: id,
    method: method,
    params: params
  }

  post_request(request)
  wait_for_response(id, queue)
end

#start!void

This method returns an undefined value.

Establishes the SSE connection and waits for the endpoint event.

Raises:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 41

def start!
  raise TransportError, 'Transport already started' if @connected

  @pending_responses = {}
  @sse_thread = Thread.new { run_sse_listener }

  # Wait for the endpoint event with a timeout
  deadline = Time.now + @timeout
  sleep(0.1) until @post_endpoint || Time.now > deadline

  unless @post_endpoint
    stop!
    raise TransportError, "MCP server did not provide an endpoint within #{@timeout}s"
  end

  @connected = true
end

#stop!void

This method returns an undefined value.

Closes the SSE connection and cleans up resources.



87
88
89
90
91
92
93
# File 'lib/rubyn_code/mcp/sse_transport.rb', line 87

def stop!
  @connected = false
  @sse_thread&.kill
  @sse_thread = nil
  @post_endpoint = nil
  @pending_responses.clear
end