Class: Legate::Mcp::Connection::Sse

Inherits:
Object
  • Object
show all
Defined in:
lib/legate/mcp/connection/sse.rb

Overview

Manages a connection to an MCP server via HTTP/SSE. Uses Server-Sent Events (SSE) for server-to-client notifications and standard HTTP POST for client-to-server requests/responses. Automatically reconnects with exponential backoff on stream drops.

Constant Summary collapse

MAX_RECONNECT_ATTEMPTS =
5
RECONNECT_BASE_DELAY =
1
RECONNECT_MAX_DELAY =
30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:) ⇒ Sse

Returns a new instance of Sse.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/legate/mcp/connection/sse.rb', line 23

def initialize(url:)
  base_uri = URI.parse(url)
  base_path = base_uri.path
  base_path += '/' unless base_path.end_with?('/')
  @base_uri = base_uri.dup
  @base_uri.path = base_path

  @sse_uri = @base_uri + 'sse'
  @message_uri = @base_uri + 'messages'
  @sse_reader_thread = nil
  @connected = false
  @disconnecting = false
  @request_id_counter = 0
  @notification_queue = Queue.new
  @connect_signal = nil
  @last_error = nil
  Legate.logger.info("SSE Connection initialized for URL: #{@base_uri}, SSE: #{@sse_uri}, Msg: #{@message_uri}")
end

Instance Attribute Details

#last_errorObject (readonly)

Returns the value of attribute last_error.



21
22
23
# File 'lib/legate/mcp/connection/sse.rb', line 21

def last_error
  @last_error
end

#notification_queueObject (readonly)

Returns the value of attribute notification_queue.



21
22
23
# File 'lib/legate/mcp/connection/sse.rb', line 21

def notification_queue
  @notification_queue
end

#urlObject (readonly)

Returns the value of attribute url.



21
22
23
# File 'lib/legate/mcp/connection/sse.rb', line 21

def url
  @url
end

Instance Method Details

#connectObject

Connects to the SSE endpoint and starts listening for notifications. Initial connection is synchronous — raises on failure. After a successful connection, stream drops trigger automatic reconnection.

Raises:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/legate/mcp/connection/sse.rb', line 50

def connect
  return true if connected?

  Legate.logger.info("Connecting to SSE endpoint: #{@sse_uri}...")
  @disconnecting = false
  @last_error = nil
  @notification_queue.clear
  @connect_signal = Queue.new

  @sse_reader_thread = Thread.new { connection_loop }

  result = @connect_signal.pop(timeout: 10)
  @connect_signal = nil

  raise ConnectionError, @last_error || 'Failed to establish SSE connection within timeout' unless result == :connected

  true
end

#connected?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/legate/mcp/connection/sse.rb', line 42

def connected?
  @connected && @sse_reader_thread&.alive?
end

#disconnectObject

Disconnects the SSE stream and stops reconnection attempts.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/legate/mcp/connection/sse.rb', line 70

def disconnect
  return unless @connected || @sse_reader_thread

  Legate.logger.info('Disconnecting SSE connection...')
  @disconnecting = true
  @connected = false

  if @sse_reader_thread&.alive?
    @sse_reader_thread.kill
    @sse_reader_thread.join(2)
  end

  @sse_reader_thread = nil
  @notification_queue.clear
  Legate.logger.info('SSE connection disconnected.')
end

#next_request_idObject



161
162
163
# File 'lib/legate/mcp/connection/sse.rb', line 161

def next_request_id
  @request_id_counter += 1
end

#read_notification(timeout = 0.1) ⇒ Hash?

Reads the next notification from the queue.

Parameters:

  • timeout (Numeric, nil) (defaults to: 0.1)

    Seconds to wait, 0 for non-blocking.

Returns:

  • (Hash, nil)

    Notification hash or nil if queue is empty/timeout occurs.



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/legate/mcp/connection/sse.rb', line 149

def read_notification(timeout = 0.1)
  return nil unless connected?

  begin
    return @notification_queue.pop(true) if timeout == 0

    @notification_queue.pop(timeout: timeout)
  rescue ThreadError
    nil
  end
end

#send_request(json_rpc_hash) ⇒ Hash

Sends a request via HTTP POST and returns the response immediately.

Parameters:

  • json_rpc_hash (Hash)

    The JSON-RPC request.

Returns:

  • (Hash)

    The parsed JSON-RPC response from the server.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/legate/mcp/connection/sse.rb', line 90

def send_request(json_rpc_hash)
  request_json = json_rpc_hash.to_json
  Legate.logger.debug("-> [MCP Client POST] #{@message_uri} Body: #{request_json}")

  begin
    http = Net::HTTP.new(@message_uri.hostname, @message_uri.port)
    http.use_ssl = (@message_uri.scheme == 'https')
    http.open_timeout = 5
    http.read_timeout = 15

    request = Net::HTTP::Post.new(@message_uri.request_uri)
    request['Content-Type'] = 'application/json'
    request['Accept'] = 'application/json'
    request.body = request_json

    response = http.request(request)

    unless response.is_a?(Net::HTTPOK)
      msg = "MCP POST request failed: #{response.code} #{response.message}. Body: #{response.body[0..500]}"
      Legate.logger.error(msg)
      @last_error = msg
      begin
        error_details = JSON.parse(response.body, symbolize_names: true)
        if error_details[:error]
          raise RemoteToolError.new(error_details[:error][:message], error_details[:error][:code],
                                    error_details[:error][:data])
        end
      rescue JSON::ParserError
        # Body is not JSON
      end
      raise ConnectionError, msg
    end

    begin
      response_hash = JSON.parse(response.body, symbolize_names: true)
      Legate.logger.debug("<- [MCP Client POST Response] #{response_hash.inspect}")
      response_hash
    rescue JSON::ParserError => e
      msg = "Failed to parse MCP JSON response from POST: #{e.message}. Body: #{response.body[0..500]}"
      Legate.logger.error(msg)
      @last_error = msg
      raise ProtocolError, msg
    end
  rescue Timeout::Error, Errno::ECONNREFUSED, Errno::EHOSTUNREACH, SocketError => e
    @last_error = "Failed to send POST to #{@message_uri}: #{e.class} - #{e.message}"
    Legate.logger.error(@last_error)
    raise ConnectionError, @last_error
  rescue Legate::Mcp::ProtocolError, Legate::Mcp::RemoteToolError
    raise
  rescue StandardError => e
    @last_error = "Unexpected error during POST send_request: #{e.class} - #{e.message}"
    Legate.logger.error("#{@last_error}\n#{e.backtrace.join("\n")}")
    raise ConnectionError, @last_error
  end
end