Class: Manceps::Transport::StreamableHTTP

Inherits:
Base
  • Object
show all
Defined in:
lib/manceps/transport/streamable_http.rb

Overview

Streamable HTTP transport with persistent connections and SSE support.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#on_notification

Constructor Details

#initialize(url, auth:, timeout: nil) ⇒ StreamableHTTP

Returns a new instance of StreamableHTTP.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/manceps/transport/streamable_http.rb', line 13

def initialize(url, auth:, timeout: nil)
  super()
  @url = url
  @auth = auth
  @session_id = nil
  @last_event_id = nil
  @protocol_version = nil

  timeout_opts = timeout || {
    connect_timeout: Manceps.configuration.connect_timeout,
    request_timeout: Manceps.configuration.request_timeout
  }

  # httpx maintains persistent connections by default —
  # critical because MCP servers bind Mcp-Session-Id to the TCP connection
  @http = HTTPX.with(timeout: timeout_opts)
end

Instance Attribute Details

#protocol_version=(value) ⇒ Object (writeonly)

Sets the attribute protocol_version

Parameters:

  • value

    the value to set the attribute protocol_version to.



11
12
13
# File 'lib/manceps/transport/streamable_http.rb', line 11

def protocol_version=(value)
  @protocol_version = value
end

#session_idObject (readonly)

Returns the value of attribute session_id.



10
11
12
# File 'lib/manceps/transport/streamable_http.rb', line 10

def session_id
  @session_id
end

Instance Method Details

#closeObject



114
115
116
117
# File 'lib/manceps/transport/streamable_http.rb', line 114

def close
  @http.close if @http.respond_to?(:close)
  @session_id = nil
end

#listen(&block) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/manceps/transport/streamable_http.rb', line 92

def listen(&block)
  headers = base_headers.dup
  headers.delete('content-type')
  headers['accept'] = 'text/event-stream'

  response = @http.get(@url, headers: headers)
  handle_error_response(response)

  content_type = response.content_type&.mime_type.to_s
  return unless content_type.include?('text/event-stream')

  events = SSEParser.parse_events(response.body.to_s)
  events.each do |event|
    parsed = begin
      JSON.parse(event[:data])
    rescue StandardError
      next
    end
    block.call(parsed) if parsed['method']
  end
end

#notify(body) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/manceps/transport/streamable_http.rb', line 73

def notify(body)
  response = @http.post(@url, headers: base_headers, body: JSON.generate(body))
  handle_connection_error(response)
  handle_error_response(response) unless response.status == 202
rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Errno::EHOSTUNREACH => e
  raise ConnectionError, e.message
end

#request(body) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/manceps/transport/streamable_http.rb', line 31

def request(body)
  response = @http.post(@url, headers: base_headers, body: JSON.generate(body))
  handle_connection_error(response)
  handle_error_response(response)
  capture_session_id(response)
  result = parse_response(response)
  track_event_ids_from_response(response)
  result
rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Errno::EHOSTUNREACH => e
  raise ConnectionError, e.message
end

#request_streaming(body, &block) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/manceps/transport/streamable_http.rb', line 43

def request_streaming(body, &block)
  response = @http.post(@url, headers: base_headers, body: JSON.generate(body))
  handle_connection_error(response)
  handle_error_response(response)
  capture_session_id(response)

  content_type = response.content_type&.mime_type.to_s

  if content_type.include?('text/event-stream')
    events = SSEParser.parse_events(response.body.to_s)
    track_event_ids(events)
    final_result = nil
    events.each do |event|
      parsed = begin
        JSON.parse(event[:data])
      rescue StandardError
        next
      end
      if parsed['result'] || parsed['error']
        final_result = parsed
      elsif block
        block.call(parsed)
      end
    end
    final_result || parse_response(response)
  else
    parse_response(response)
  end
end

#terminate_session(session_id) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/manceps/transport/streamable_http.rb', line 81

def terminate_session(session_id)
  headers = {}
  headers['mcp-session-id'] = session_id
  @auth.apply(headers)
  begin
    @http.delete(@url, headers: headers)
  rescue StandardError
    nil
  end
end