Class: Legate::Mcp::Connection::Sse
- Inherits:
-
Object
- Object
- Legate::Mcp::Connection::Sse
- Defined in:
- lib/legate/mcp/connection/sse.rb
Overview
Manages a connection to an MCP server via HTTP/SSE. Uses Server-Sent Events (SSE) for server-to-client notifications and standard HTTP POST for client-to-server requests/responses. Automatically reconnects with exponential backoff on stream drops.
Constant Summary collapse
- MAX_RECONNECT_ATTEMPTS =
5- RECONNECT_BASE_DELAY =
1- RECONNECT_MAX_DELAY =
30
Instance Attribute Summary collapse
-
#last_error ⇒ Object
readonly
Returns the value of attribute last_error.
-
#notification_queue ⇒ Object
readonly
Returns the value of attribute notification_queue.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#connect ⇒ Object
Connects to the SSE endpoint and starts listening for notifications.
- #connected? ⇒ Boolean
-
#disconnect ⇒ Object
Disconnects the SSE stream and stops reconnection attempts.
-
#initialize(url:) ⇒ Sse
constructor
A new instance of Sse.
- #next_request_id ⇒ Object
-
#read_notification(timeout = 0.1) ⇒ Hash?
Reads the next notification from the queue.
-
#send_request(json_rpc_hash) ⇒ Hash
Sends a request via HTTP POST and returns the response immediately.
Constructor Details
#initialize(url:) ⇒ Sse
Returns a new instance of Sse.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/legate/mcp/connection/sse.rb', line 23 def initialize(url:) base_uri = URI.parse(url) base_path = base_uri.path base_path += '/' unless base_path.end_with?('/') @base_uri = base_uri.dup @base_uri.path = base_path @sse_uri = @base_uri + 'sse' @message_uri = @base_uri + 'messages' @sse_reader_thread = nil @connected = false @disconnecting = false @request_id_counter = 0 @notification_queue = Queue.new @connect_signal = nil @last_error = nil Legate.logger.info("SSE Connection initialized for URL: #{@base_uri}, SSE: #{@sse_uri}, Msg: #{@message_uri}") end |
Instance Attribute Details
#last_error ⇒ Object (readonly)
Returns the value of attribute last_error.
21 22 23 |
# File 'lib/legate/mcp/connection/sse.rb', line 21 def last_error @last_error end |
#notification_queue ⇒ Object (readonly)
Returns the value of attribute notification_queue.
21 22 23 |
# File 'lib/legate/mcp/connection/sse.rb', line 21 def notification_queue @notification_queue end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
21 22 23 |
# File 'lib/legate/mcp/connection/sse.rb', line 21 def url @url end |
Instance Method Details
#connect ⇒ Object
Connects to the SSE endpoint and starts listening for notifications. Initial connection is synchronous — raises on failure. After a successful connection, stream drops trigger automatic reconnection.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/legate/mcp/connection/sse.rb', line 50 def connect return true if connected? Legate.logger.info("Connecting to SSE endpoint: #{@sse_uri}...") @disconnecting = false @last_error = nil @notification_queue.clear @connect_signal = Queue.new @sse_reader_thread = Thread.new { connection_loop } result = @connect_signal.pop(timeout: 10) @connect_signal = nil raise ConnectionError, @last_error || 'Failed to establish SSE connection within timeout' unless result == :connected true end |
#connected? ⇒ Boolean
42 43 44 |
# File 'lib/legate/mcp/connection/sse.rb', line 42 def connected? @connected && @sse_reader_thread&.alive? end |
#disconnect ⇒ Object
Disconnects the SSE stream and stops reconnection attempts.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/legate/mcp/connection/sse.rb', line 70 def disconnect return unless @connected || @sse_reader_thread Legate.logger.info('Disconnecting SSE connection...') @disconnecting = true @connected = false if @sse_reader_thread&.alive? @sse_reader_thread.kill @sse_reader_thread.join(2) end @sse_reader_thread = nil @notification_queue.clear Legate.logger.info('SSE connection disconnected.') end |
#next_request_id ⇒ Object
161 162 163 |
# File 'lib/legate/mcp/connection/sse.rb', line 161 def next_request_id @request_id_counter += 1 end |
#read_notification(timeout = 0.1) ⇒ Hash?
Reads the next notification from the queue.
149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/legate/mcp/connection/sse.rb', line 149 def read_notification(timeout = 0.1) return nil unless connected? begin return @notification_queue.pop(true) if timeout == 0 @notification_queue.pop(timeout: timeout) rescue ThreadError nil end end |
#send_request(json_rpc_hash) ⇒ Hash
Sends a request via HTTP POST and returns the response immediately.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/legate/mcp/connection/sse.rb', line 90 def send_request(json_rpc_hash) request_json = json_rpc_hash.to_json Legate.logger.debug("-> [MCP Client POST] #{@message_uri} Body: #{request_json}") begin http = Net::HTTP.new(@message_uri.hostname, @message_uri.port) http.use_ssl = (@message_uri.scheme == 'https') http.open_timeout = 5 http.read_timeout = 15 request = Net::HTTP::Post.new(@message_uri.request_uri) request['Content-Type'] = 'application/json' request['Accept'] = 'application/json' request.body = request_json response = http.request(request) unless response.is_a?(Net::HTTPOK) msg = "MCP POST request failed: #{response.code} #{response.}. Body: #{response.body[0..500]}" Legate.logger.error(msg) @last_error = msg begin error_details = JSON.parse(response.body, symbolize_names: true) if error_details[:error] raise RemoteToolError.new(error_details[:error][:message], error_details[:error][:code], error_details[:error][:data]) end rescue JSON::ParserError # Body is not JSON end raise ConnectionError, msg end begin response_hash = JSON.parse(response.body, symbolize_names: true) Legate.logger.debug("<- [MCP Client POST Response] #{response_hash.inspect}") response_hash rescue JSON::ParserError => e msg = "Failed to parse MCP JSON response from POST: #{e.}. Body: #{response.body[0..500]}" Legate.logger.error(msg) @last_error = msg raise ProtocolError, msg end rescue Timeout::Error, Errno::ECONNREFUSED, Errno::EHOSTUNREACH, SocketError => e @last_error = "Failed to send POST to #{@message_uri}: #{e.class} - #{e.}" Legate.logger.error(@last_error) raise ConnectionError, @last_error rescue Legate::Mcp::ProtocolError, Legate::Mcp::RemoteToolError raise rescue StandardError => e @last_error = "Unexpected error during POST send_request: #{e.class} - #{e.}" Legate.logger.error("#{@last_error}\n#{e.backtrace.join("\n")}") raise ConnectionError, @last_error end end |