Class: Manceps::Transport::StreamableHTTP
- Defined in:
- lib/manceps/transport/streamable_http.rb
Overview
Streamable HTTP transport with persistent connections and SSE support.
Instance Attribute Summary collapse
-
#protocol_version ⇒ Object
writeonly
Sets the attribute protocol_version.
-
#session_id ⇒ Object
readonly
Returns the value of attribute session_id.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(url, auth:, timeout: nil) ⇒ StreamableHTTP
constructor
A new instance of StreamableHTTP.
- #listen(&block) ⇒ Object
- #notify(body) ⇒ Object
- #request(body) ⇒ Object
- #request_streaming(body, &block) ⇒ Object
- #terminate_session(session_id) ⇒ Object
Methods inherited from Base
Constructor Details
#initialize(url, auth:, timeout: nil) ⇒ StreamableHTTP
Returns a new instance of StreamableHTTP.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/manceps/transport/streamable_http.rb', line 13 def initialize(url, auth:, timeout: nil) super() @url = url @auth = auth @session_id = nil @last_event_id = nil @protocol_version = nil timeout_opts = timeout || { connect_timeout: Manceps.configuration.connect_timeout, request_timeout: Manceps.configuration.request_timeout } # httpx maintains persistent connections by default — # critical because MCP servers bind Mcp-Session-Id to the TCP connection @http = HTTPX.with(timeout: timeout_opts) end |
Instance Attribute Details
#protocol_version=(value) ⇒ Object (writeonly)
Sets the attribute protocol_version
11 12 13 |
# File 'lib/manceps/transport/streamable_http.rb', line 11 def protocol_version=(value) @protocol_version = value end |
#session_id ⇒ Object (readonly)
Returns the value of attribute session_id.
10 11 12 |
# File 'lib/manceps/transport/streamable_http.rb', line 10 def session_id @session_id end |
Instance Method Details
#close ⇒ Object
114 115 116 117 |
# File 'lib/manceps/transport/streamable_http.rb', line 114 def close @http.close if @http.respond_to?(:close) @session_id = nil end |
#listen(&block) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/manceps/transport/streamable_http.rb', line 92 def listen(&block) headers = base_headers.dup headers.delete('content-type') headers['accept'] = 'text/event-stream' response = @http.get(@url, headers: headers) handle_error_response(response) content_type = response.content_type&.mime_type.to_s return unless content_type.include?('text/event-stream') events = SSEParser.parse_events(response.body.to_s) events.each do |event| parsed = begin JSON.parse(event[:data]) rescue StandardError next end block.call(parsed) if parsed['method'] end end |
#notify(body) ⇒ Object
73 74 75 76 77 78 79 |
# File 'lib/manceps/transport/streamable_http.rb', line 73 def notify(body) response = @http.post(@url, headers: base_headers, body: JSON.generate(body)) handle_connection_error(response) handle_error_response(response) unless response.status == 202 rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Errno::EHOSTUNREACH => e raise ConnectionError, e. end |
#request(body) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/manceps/transport/streamable_http.rb', line 31 def request(body) response = @http.post(@url, headers: base_headers, body: JSON.generate(body)) handle_connection_error(response) handle_error_response(response) capture_session_id(response) result = parse_response(response) track_event_ids_from_response(response) result rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Errno::EHOSTUNREACH => e raise ConnectionError, e. end |
#request_streaming(body, &block) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/manceps/transport/streamable_http.rb', line 43 def request_streaming(body, &block) response = @http.post(@url, headers: base_headers, body: JSON.generate(body)) handle_connection_error(response) handle_error_response(response) capture_session_id(response) content_type = response.content_type&.mime_type.to_s if content_type.include?('text/event-stream') events = SSEParser.parse_events(response.body.to_s) track_event_ids(events) final_result = nil events.each do |event| parsed = begin JSON.parse(event[:data]) rescue StandardError next end if parsed['result'] || parsed['error'] final_result = parsed elsif block block.call(parsed) end end final_result || parse_response(response) else parse_response(response) end end |
#terminate_session(session_id) ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/manceps/transport/streamable_http.rb', line 81 def terminate_session(session_id) headers = {} headers['mcp-session-id'] = session_id @auth.apply(headers) begin @http.delete(@url, headers: headers) rescue StandardError nil end end |