Class: VectorMCP::Transport::HttpStream
- Inherits:
-
Object
- Object
- VectorMCP::Transport::HttpStream
- Defined in:
- lib/vector_mcp/transport/http_stream.rb,
lib/vector_mcp/transport/http_stream/event_store.rb,
lib/vector_mcp/transport/http_stream/stream_handler.rb,
lib/vector_mcp/transport/http_stream/session_manager.rb
Overview
Implements the Model Context Protocol transport over HTTP with streaming support according to the MCP specification for Streamable HTTP transport.
This transport supports:
-
Client-to-server communication via HTTP POST
-
Optional server-to-client streaming via Server-Sent Events (SSE)
-
Session management with Mcp-Session-Id headers
-
Resumable connections with event IDs and Last-Event-ID support
-
Bidirectional communication patterns
Endpoints:
-
POST /mcp - Client sends JSON-RPC requests
-
GET /mcp - Optional SSE streaming for server-initiated messages
-
DELETE /mcp - Session termination
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: EventStore, SessionManager, StreamHandler
Constant Summary collapse
- DEFAULT_HOST =
Default configuration values
"localhost"- DEFAULT_PORT =
8000- DEFAULT_PATH_PREFIX =
"/mcp"- DEFAULT_SESSION_TIMEOUT =
5 minutes
300- DEFAULT_EVENT_RETENTION =
Keep last 100 events for resumability
100- DEFAULT_REQUEST_TIMEOUT =
Default timeout for server-initiated requests
30- DEFAULT_MIN_THREADS =
4- DEFAULT_MAX_THREADS =
32- DEFAULT_ALLOWED_ORIGINS =
Default allowed origins — restrict to localhost by default for security.
%w[ http://localhost https://localhost http://127.0.0.1 https://127.0.0.1 http://[::1] https://[::1] ].freeze
Instance Attribute Summary collapse
-
#event_store ⇒ HttpStream::EventStore
readonly
private
Provides access to event store for internal components.
-
#host ⇒ String
readonly
The hostname or IP address the server will bind to.
-
#logger ⇒ Logger
readonly
The logger instance, shared with the server.
-
#path_prefix ⇒ String
readonly
The base URL path for MCP endpoints.
-
#port ⇒ Integer
readonly
The port number the server will listen on.
-
#server ⇒ VectorMCP::Server
readonly
The server instance this transport is bound to.
-
#session_manager ⇒ HttpStream::SessionManager
readonly
private
Provides access to session manager for internal components.
-
#stream_handler ⇒ HttpStream::StreamHandler
readonly
private
Provides access to stream handler for internal components.
Instance Method Summary collapse
-
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests (Rack interface).
-
#initialize(server, options = {}) ⇒ HttpStream
constructor
Initializes a new HTTP Stream transport.
-
#run ⇒ void
Starts the HTTP Stream transport.
-
#send_notification(method, params = nil) ⇒ Boolean
Sends a notification to the first available session.
-
#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean
Sends a notification to a specific session.
-
#send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request compatible with Session expectations.
-
#send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request to a specific session and waits for a response.
-
#stop ⇒ void
Stops the transport and cleans up resources.
Constructor Details
#initialize(server, options = {}) ⇒ HttpStream
Initializes a new HTTP Stream transport.
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 81 def initialize(server, = {}) @server = server @logger = server.logger initialize_configuration() initialize_components initialize_request_tracking initialize_object_pools initialize_server_state logger.info { "HttpStream transport initialized: #{@host}:#{@port}#{@path_prefix}" } end |
Instance Attribute Details
#event_store ⇒ HttpStream::EventStore (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides access to event store for internal components.
246 247 248 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 246 def event_store @event_store end |
#host ⇒ String (readonly)
The hostname or IP address the server will bind to
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def host @host end |
#logger ⇒ Logger (readonly)
The logger instance, shared with the server
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def logger @logger end |
#path_prefix ⇒ String (readonly)
The base URL path for MCP endpoints
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def path_prefix @path_prefix end |
#port ⇒ Integer (readonly)
The port number the server will listen on
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def port @port end |
#server ⇒ VectorMCP::Server (readonly)
The server instance this transport is bound to
45 46 47 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 45 def server @server end |
#session_manager ⇒ HttpStream::SessionManager (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides access to session manager for internal components.
240 241 242 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 240 def session_manager @session_manager end |
#stream_handler ⇒ HttpStream::StreamHandler (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides access to stream handler for internal components.
252 253 254 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 252 def stream_handler @stream_handler end |
Instance Method Details
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests (Rack interface). Routes requests to appropriate handlers based on path and method.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 109 def call(env) start_time = Time.now path = env["PATH_INFO"] method = env["REQUEST_METHOD"] transport_context = build_transport_context(env, method, path, start_time) logger.debug { "Processing HTTP request #{method} #{path}" } transport_context = execute_transport_hooks(:before_request, transport_context) raise transport_context.error if transport_context.error? return transport_context.result if transport_context.result response = route_request(path, method, env) transport_context.result = response transport_context = execute_transport_hooks(:after_response, transport_context) raise transport_context.error if transport_context.error? response = transport_context.result || response log_request_completion(method, path, start_time, response[0]) response rescue StandardError => e if transport_context transport_context.error = e transport_context = execute_transport_hooks(:on_transport_error, transport_context) return transport_context.result if transport_context.result end handle_request_error(method, path, e) end |
#run ⇒ void
This method returns an undefined value.
Starts the HTTP Stream transport. This method will block until the server is stopped.
98 99 100 101 102 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 98 def run start_puma_server rescue StandardError => e handle_fatal_error(e) end |
#send_notification(method, params = nil) ⇒ Boolean
Sends a notification to the first available session.
144 145 146 147 148 149 150 151 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 144 def send_notification(method, params = nil) # Find the first available session first_session = find_first_session return false unless first_session = build_notification(method, params) @stream_handler.(first_session, ) end |
#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean
Sends a notification to a specific session.
159 160 161 162 163 164 165 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 159 def send_notification_to_session(session_id, method, params = nil) session = @session_manager.get_session(session_id) return false unless session = build_notification(method, params) @stream_handler.(session, ) end |
#send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request compatible with Session expectations. This method will block until a response is received or the timeout is reached. For HTTP transport, this requires finding an appropriate session with streaming connection.
177 178 179 180 181 182 183 184 185 186 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 177 def send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty? # Find the first session with streaming connection # In HTTP transport, we need an active streaming connection to send server-initiated requests streaming_session = find_streaming_session raise ArgumentError, "No streaming session available for server-initiated requests" unless streaming_session send_request_to_session(streaming_session.id, method, params, timeout: timeout) end |
#send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object
Sends a server-initiated JSON-RPC request to a specific session and waits for a response. This method will block until a response is received or the timeout is reached.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 198 def send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty? raise ArgumentError, "Session ID cannot be blank" if session_id.to_s.strip.empty? session = @session_manager.get_session(session_id) raise ArgumentError, "Session not found: #{session_id}" unless session raise ArgumentError, "Session must have streaming connection for server-initiated requests" unless session.streaming? request_id = generate_request_id request_payload = { jsonrpc: "2.0", id: request_id, method: method } request_payload[:params] = params if params setup_request_tracking(request_id) # Sending request to session # Send request via existing streaming connection unless @stream_handler.(session, request_payload) cleanup_request_tracking(request_id) raise VectorMCP::SamplingError, "Failed to send request to session #{session_id}" end response = wait_for_response(request_id, method, timeout) process_response(response, request_id, method) end |
#stop ⇒ void
This method returns an undefined value.
Stops the transport and cleans up resources.
227 228 229 230 231 232 233 234 |
# File 'lib/vector_mcp/transport/http_stream.rb', line 227 def stop logger.info { "Stopping HttpStream transport" } @running = false cleanup_all_pending_requests @session_manager.cleanup_all_sessions @puma_server&.stop logger.info { "HttpStream transport stopped" } end |