Class: VectorMCP::Transport::HttpStream

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/http_stream.rb,
lib/vector_mcp/transport/http_stream/event_store.rb,
lib/vector_mcp/transport/http_stream/stream_handler.rb,
lib/vector_mcp/transport/http_stream/session_manager.rb

Overview

Implements the Model Context Protocol transport over HTTP with streaming support according to the MCP specification for Streamable HTTP transport.

This transport supports:

  • Client-to-server communication via HTTP POST

  • Optional server-to-client streaming via Server-Sent Events (SSE)

  • Session management with Mcp-Session-Id headers

  • Resumable connections with event IDs and Last-Event-ID support

  • Bidirectional communication patterns

Endpoints:

  • POST /mcp - Client sends JSON-RPC requests

  • GET /mcp - Optional SSE streaming for server-initiated messages

  • DELETE /mcp - Session termination

rubocop:disable Metrics/ClassLength

Examples:

Basic Usage

server = VectorMCP::Server.new("http-stream-server")
transport = VectorMCP::Transport::HttpStream.new(server, port: 8080)
server.run(transport: transport)

Defined Under Namespace

Classes: EventStore, SessionManager, StreamHandler

Constant Summary collapse

DEFAULT_HOST =

Default configuration values

"localhost"
DEFAULT_PORT =
8000
DEFAULT_PATH_PREFIX =
"/mcp"
DEFAULT_SESSION_TIMEOUT =

5 minutes

300
DEFAULT_EVENT_RETENTION =

Keep last 100 events for resumability

100
DEFAULT_REQUEST_TIMEOUT =

Default timeout for server-initiated requests

30
DEFAULT_MIN_THREADS =
4
DEFAULT_MAX_THREADS =
32
DEFAULT_ALLOWED_ORIGINS =

Default allowed origins — restrict to localhost by default for security.

%w[
  http://localhost
  https://localhost
  http://127.0.0.1
  https://127.0.0.1
  http://[::1]
  https://[::1]
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ HttpStream

Initializes a new HTTP Stream transport.

Parameters:

  • server (VectorMCP::Server)

    The server instance that will handle messages

  • options (Hash) (defaults to: {})

    Configuration options for the transport

Options Hash (options):

  • :host (String) — default: "localhost"

    The hostname or IP to bind to

  • :port (Integer) — default: 8000

    The port to listen on

  • :path_prefix (String) — default: "/mcp"

    The base path for HTTP endpoints

  • :session_timeout (Integer) — default: 300

    Session timeout in seconds

  • :event_retention (Integer) — default: 100

    Number of events to retain for resumability

  • :min_threads (Integer) — default: 4

    Minimum Puma thread pool size

  • :max_threads (Integer) — default: 32

    Maximum Puma thread pool size

  • :allowed_origins (Array<String>)

    Allowed origins for CORS validation. Defaults to localhost origins only. Pass [“*”] to allow all origins (NOT recommended for production).



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/vector_mcp/transport/http_stream.rb', line 81

def initialize(server, options = {})
  @server = server
  @logger = server.logger
  initialize_configuration(options)
  initialize_components
  initialize_request_tracking
  initialize_object_pools
  initialize_server_state

  logger.info { "HttpStream transport initialized: #{@host}:#{@port}#{@path_prefix}" }
end

Instance Attribute Details

#event_storeHttpStream::EventStore (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides access to event store for internal components.



246
247
248
# File 'lib/vector_mcp/transport/http_stream.rb', line 246

def event_store
  @event_store
end

#hostString (readonly)

The hostname or IP address the server will bind to

Returns:

  • (String)

    the current value of host



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def host
  @host
end

#loggerLogger (readonly)

The logger instance, shared with the server

Returns:

  • (Logger)

    the current value of logger



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def logger
  @logger
end

#path_prefixString (readonly)

The base URL path for MCP endpoints

Returns:

  • (String)

    the current value of path_prefix



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def path_prefix
  @path_prefix
end

#portInteger (readonly)

The port number the server will listen on

Returns:

  • (Integer)

    the current value of port



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def port
  @port
end

#serverVectorMCP::Server (readonly)

The server instance this transport is bound to

Returns:



45
46
47
# File 'lib/vector_mcp/transport/http_stream.rb', line 45

def server
  @server
end

#session_managerHttpStream::SessionManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides access to session manager for internal components.



240
241
242
# File 'lib/vector_mcp/transport/http_stream.rb', line 240

def session_manager
  @session_manager
end

#stream_handlerHttpStream::StreamHandler (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides access to stream handler for internal components.



252
253
254
# File 'lib/vector_mcp/transport/http_stream.rb', line 252

def stream_handler
  @stream_handler
end

Instance Method Details

#call(env) ⇒ Array(Integer, Hash, Object)

Handles incoming HTTP requests (Rack interface). Routes requests to appropriate handlers based on path and method.

Parameters:

  • env (Hash)

    The Rack environment hash

Returns:

  • (Array(Integer, Hash, Object))

    Standard Rack response triplet



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/vector_mcp/transport/http_stream.rb', line 109

def call(env)
  start_time = Time.now
  path = env["PATH_INFO"]
  method = env["REQUEST_METHOD"]
  transport_context = build_transport_context(env, method, path, start_time)

  logger.debug { "Processing HTTP request #{method} #{path}" }

  transport_context = execute_transport_hooks(:before_request, transport_context)
  raise transport_context.error if transport_context.error?
  return transport_context.result if transport_context.result

  response = route_request(path, method, env)
  transport_context.result = response
  transport_context = execute_transport_hooks(:after_response, transport_context)
  raise transport_context.error if transport_context.error?

  response = transport_context.result || response
  log_request_completion(method, path, start_time, response[0])
  response
rescue StandardError => e
  if transport_context
    transport_context.error = e
    transport_context = execute_transport_hooks(:on_transport_error, transport_context)
    return transport_context.result if transport_context.result
  end

  handle_request_error(method, path, e)
end

#runvoid

This method returns an undefined value.

Starts the HTTP Stream transport. This method will block until the server is stopped.

Raises:

  • (StandardError)

    if there’s a fatal error during server startup



98
99
100
101
102
# File 'lib/vector_mcp/transport/http_stream.rb', line 98

def run
  start_puma_server
rescue StandardError => e
  handle_fatal_error(e)
end

#send_notification(method, params = nil) ⇒ Boolean

Sends a notification to the first available session.

Parameters:

  • method (String)

    The notification method name

  • params (Hash, Array, nil) (defaults to: nil)

    The notification parameters

Returns:

  • (Boolean)

    True if notification was sent successfully



144
145
146
147
148
149
150
151
# File 'lib/vector_mcp/transport/http_stream.rb', line 144

def send_notification(method, params = nil)
  # Find the first available session
  first_session = find_first_session
  return false unless first_session

  message = build_notification(method, params)
  @stream_handler.send_message_to_session(first_session, message)
end

#send_notification_to_session(session_id, method, params = nil) ⇒ Boolean

Sends a notification to a specific session.

Parameters:

  • session_id (String)

    The target session ID

  • method (String)

    The notification method name

  • params (Hash, Array, nil) (defaults to: nil)

    The notification parameters

Returns:

  • (Boolean)

    True if notification was sent successfully



159
160
161
162
163
164
165
# File 'lib/vector_mcp/transport/http_stream.rb', line 159

def send_notification_to_session(session_id, method, params = nil)
  session = @session_manager.get_session(session_id)
  return false unless session

  message = build_notification(method, params)
  @stream_handler.send_message_to_session(session, message)
end

#send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object

Sends a server-initiated JSON-RPC request compatible with Session expectations. This method will block until a response is received or the timeout is reached. For HTTP transport, this requires finding an appropriate session with streaming connection.

Parameters:

  • method (String)

    The request method name

  • params (Hash, Array, nil) (defaults to: nil)

    The request parameters

  • timeout (Numeric) (defaults to: DEFAULT_REQUEST_TIMEOUT)

    How long to wait for a response, in seconds

Returns:

  • (Object)

    The result part of the client’s response

Raises:



177
178
179
180
181
182
183
184
185
186
# File 'lib/vector_mcp/transport/http_stream.rb', line 177

def send_request(method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT)
  raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty?

  # Find the first session with streaming connection
  # In HTTP transport, we need an active streaming connection to send server-initiated requests
  streaming_session = find_streaming_session
  raise ArgumentError, "No streaming session available for server-initiated requests" unless streaming_session

  send_request_to_session(streaming_session.id, method, params, timeout: timeout)
end

#send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT) ⇒ Object

Sends a server-initiated JSON-RPC request to a specific session and waits for a response. This method will block until a response is received or the timeout is reached.

Parameters:

  • session_id (String)

    The target session ID

  • method (String)

    The request method name

  • params (Hash, Array, nil) (defaults to: nil)

    The request parameters

  • timeout (Numeric) (defaults to: DEFAULT_REQUEST_TIMEOUT)

    How long to wait for a response, in seconds

Returns:

  • (Object)

    The result part of the client’s response

Raises:



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/vector_mcp/transport/http_stream.rb', line 198

def send_request_to_session(session_id, method, params = nil, timeout: DEFAULT_REQUEST_TIMEOUT)
  raise ArgumentError, "Method cannot be blank" if method.to_s.strip.empty?
  raise ArgumentError, "Session ID cannot be blank" if session_id.to_s.strip.empty?

  session = @session_manager.get_session(session_id)
  raise ArgumentError, "Session not found: #{session_id}" unless session

  raise ArgumentError, "Session must have streaming connection for server-initiated requests" unless session.streaming?

  request_id = generate_request_id
  request_payload = { jsonrpc: "2.0", id: request_id, method: method }
  request_payload[:params] = params if params

  setup_request_tracking(request_id)
  # Sending request to session

  # Send request via existing streaming connection
  unless @stream_handler.send_message_to_session(session, request_payload)
    cleanup_request_tracking(request_id)
    raise VectorMCP::SamplingError, "Failed to send request to session #{session_id}"
  end

  response = wait_for_response(request_id, method, timeout)
  process_response(response, request_id, method)
end

#stopvoid

This method returns an undefined value.

Stops the transport and cleans up resources.



227
228
229
230
231
232
233
234
# File 'lib/vector_mcp/transport/http_stream.rb', line 227

def stop
  logger.info { "Stopping HttpStream transport" }
  @running = false
  cleanup_all_pending_requests
  @session_manager.cleanup_all_sessions
  @puma_server&.stop
  logger.info { "HttpStream transport stopped" }
end