Class: VectorMCP::Transport::HttpStream::StreamHandler Private

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/http_stream/stream_handler.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Handles Server-Sent Events streaming for HTTP transport.

Manages:

  • SSE connection lifecycle

  • Event streaming with resumability

  • Last-Event-ID header processing

  • Connection health monitoring

Defined Under Namespace

Classes: StreamingConnection

Constant Summary collapse

DEFAULT_RETRY_MS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Default reconnection time in milliseconds sent before intentional disconnections.

5000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(transport) ⇒ StreamHandler

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes a new stream handler.

Parameters:

  • transport (HttpStream)

    The parent transport instance



43
44
45
46
47
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 43

def initialize(transport)
  @transport = transport
  @logger = transport.logger
  @active_connections = Concurrent::Hash.new
end

Instance Attribute Details

#loggerObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 18

def logger
  @logger
end

#transportObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



18
19
20
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 18

def transport
  @transport
end

Instance Method Details

#active_connection_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Gets the number of active streaming connections.

Returns:

  • (Integer)

    Number of active connections



103
104
105
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 103

def active_connection_count
  @active_connections.size
end

#cleanup_all_connectionsvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Cleans up all active connections.



110
111
112
113
114
115
116
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 110

def cleanup_all_connections
  logger.info("Cleaning up all streaming connections: #{@active_connections.size}")

  @active_connections.each_value(&:close)

  @active_connections.clear
end

#handle_streaming_request(env, session) ⇒ Array

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handles a streaming request (GET request for SSE).

Parameters:

Returns:

  • (Array)

    Rack response triplet for SSE



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 54

def handle_streaming_request(env, session)
  last_event_id = extract_last_event_id(env)
  stream_id = resolve_stream_id(session, last_event_id, :get)

  logger.info("Starting SSE stream for session #{session.id}")

  headers = build_sse_headers
  body = create_sse_stream(session, last_event_id, stream_id: stream_id)

  [200, headers, body]
end

#send_message_to_session(session, message) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sends a message to a specific session.

Parameters:

Returns:

  • (Boolean)

    True if message was sent successfully



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/vector_mcp/transport/http_stream/stream_handler.rb', line 71

def send_message_to_session(session, message)
  return false unless session.streaming?

  connection = select_connection_for_message(session, message)
  return false unless connection

  begin
    # Store event for resumability
    event_data = message.to_json
    event_id = @transport.event_store.store_event(event_data, "message",
                                                  session_id: session.id,
                                                  stream_id: connection.stream_id)

    # Send via SSE
    sse_event = format_sse_event(event_data, "message", event_id)
    connection.yielder << sse_event

    logger.debug("Message sent to session #{session.id}")

    true
  rescue StandardError => e
    logger.error("Error sending message to session #{session.id}: #{e.message}")

    # Mark connection as closed and clean up
    cleanup_connection(session, connection)
    false
  end
end