Class: GroqRuby::MCP::Transports::HttpStreamable

Inherits:
Object
  • Object
show all
Includes:
GroqRuby::MCP::Transport
Defined in:
lib/groq_ruby/mcp/transports/http_streamable.rb

Overview

HTTP Streamable transport (the post-stdio transport defined by the MCP spec from 2025-03-26 onward and required by 2025-11-25).

Single endpoint URL. Each outbound JSON-RPC frame is a ‘POST` whose response is either `application/json` (one frame) or `text/event-stream` (one or more frames followed by stream close). Notifications/responses get back a 202 with no body.

‘Mcp-Session-Id` is captured from the response to `initialize` and echoed on every subsequent request. `MCP-Protocol-Version` goes on every request.

PR 2 limitation: there is no long-lived ‘GET` SSE channel for server-initiated notifications. Server-initiated traffic that arrives interleaved on a `POST` SSE response is dispatched, but standalone notifications (e.g. `notifications/tools/list_changed` with no in-flight request) are not yet received. Adding the GET stream is part of the tasks-API follow-up.

Examples:

config = HttpServerConfig.new(name: "x", url: "https://example/mcp")
transport = Transports::HttpStreamable.start(config, protocol_version: "2025-11-25")
transport.on_message { |msg| ... }
transport.send_message({jsonrpc: "2.0", id: 1, method: "tools/list"})

Constant Summary collapse

DEFAULT_OPEN_TIMEOUT =
10
DEFAULT_READ_TIMEOUT =
60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, protocol_version:, open_timeout: DEFAULT_OPEN_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT) ⇒ HttpStreamable

Returns a new instance of HttpStreamable.

Parameters:

  • config (HttpServerConfig)
  • protocol_version (String)
  • open_timeout (Numeric) (defaults to: DEFAULT_OPEN_TIMEOUT)
  • read_timeout (Numeric) (defaults to: DEFAULT_READ_TIMEOUT)


60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/groq_ruby/mcp/transports/http_streamable.rb', line 60

def initialize(config, protocol_version:, open_timeout: DEFAULT_OPEN_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT)
  @config = config
  @protocol_version = protocol_version
  @open_timeout = open_timeout
  @read_timeout = read_timeout
  @uri = URI.parse(config.url)
  @on_message = nil
  @session_id = nil
  @session_mutex = Mutex.new
  @threads = []
  @threads_mutex = Mutex.new
  @stopped = false
end

Instance Attribute Details

#protocol_versionString

Returns protocol version sent in the ‘MCP-Protocol-Version` header. Mutable so Client can update it after negotiation.

Returns:

  • (String)

    protocol version sent in the ‘MCP-Protocol-Version` header. Mutable so Client can update it after negotiation.



50
51
52
# File 'lib/groq_ruby/mcp/transports/http_streamable.rb', line 50

def protocol_version
  @protocol_version
end

#session_idString? (readonly)

Returns session id captured from the initialize response, or ‘nil` if the server is stateless.

Returns:

  • (String, nil)

    session id captured from the initialize response, or ‘nil` if the server is stateless



54
55
56
# File 'lib/groq_ruby/mcp/transports/http_streamable.rb', line 54

def session_id
  @session_id
end

Class Method Details

.start(config, protocol_version:) ⇒ HttpStreamable

Construct and return a transport ready to #send_message. The underlying HTTP connection is not opened until the first send; this is a pure constructor by another name.

Parameters:

  • config (HttpServerConfig)
  • protocol_version (String)

    e.g. ‘“2025-11-25”`

Returns:



44
45
46
# File 'lib/groq_ruby/mcp/transports/http_streamable.rb', line 44

def self.start(config, protocol_version:)
  new(config, protocol_version: protocol_version)
end

Instance Method Details

#on_message(&block) ⇒ Object



81
82
83
# File 'lib/groq_ruby/mcp/transports/http_streamable.rb', line 81

def on_message(&block)
  @on_message = block
end

#send_message(message) ⇒ Object



74
75
76
77
78
79
# File 'lib/groq_ruby/mcp/transports/http_streamable.rb', line 74

def send_message(message)
  return if @stopped
  thread = Thread.new { post(message) }
  @threads_mutex.synchronize { @threads << thread }
  nil
end

#stopObject



85
86
87
88
89
90
# File 'lib/groq_ruby/mcp/transports/http_streamable.rb', line 85

def stop
  return if @stopped
  @stopped = true
  drain_threads
  delete_session if @session_id
end