Class: Quicsilver::Transport::Connection

Inherits:
Object
  • Object
show all
Includes:
Protocol::ControlStreamParser
Defined in:
lib/quicsilver/transport/connection.rb

Constant Summary collapse

MSQUIC_INVALID_STATE =

MsQuic QUIC_STATUS_INVALID_STATE (POSIX errno ETOOMANYREFS = 0x59). Stream already shut down by peer — raised by StreamSend when the client has reset or closed the stream.

"0x59"

Constants included from Protocol::ControlStreamParser

Protocol::ControlStreamParser::HTTP2_SETTINGS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Protocol::ControlStreamParser

#parse_control_frames, #parse_peer_goaway, #parse_peer_settings

Constructor Details

#initialize(handle, data, max_header_size: nil) ⇒ Connection

Returns a new instance of Connection.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/quicsilver/transport/connection.rb', line 19

def initialize(handle, data, max_header_size: nil)
  @handle = handle
  @data = data
  @max_header_size = max_header_size
  @streams = {}
  @response_buffers = {}
  @mutex = Mutex.new

  # Client's control streams (received)
  @control_stream_id = nil
  @qpack_encoder_stream_id = nil
  @qpack_decoder_stream_id = nil

  # Server's control stream (sent)
  @server_control_stream = nil

  @settings = {}
  @settings_received = false
  @peer_goaway_id = nil
  @local_goaway_id = nil
  @stream_priorities = {}
end

Instance Attribute Details

#control_stream_idObject (readonly)

Returns the value of attribute control_stream_id.



14
15
16
# File 'lib/quicsilver/transport/connection.rb', line 14

def control_stream_id
  @control_stream_id
end

#dataObject (readonly)

Returns the value of attribute data.



13
14
15
# File 'lib/quicsilver/transport/connection.rb', line 13

def data
  @data
end

#handleObject (readonly)

Returns the value of attribute handle.



13
14
15
# File 'lib/quicsilver/transport/connection.rb', line 13

def handle
  @handle
end

#local_goaway_idObject (readonly)

Returns the value of attribute local_goaway_id.



16
17
18
# File 'lib/quicsilver/transport/connection.rb', line 16

def local_goaway_id
  @local_goaway_id
end

#peer_goaway_idObject (readonly)

Returns the value of attribute peer_goaway_id.



16
17
18
# File 'lib/quicsilver/transport/connection.rb', line 16

def peer_goaway_id
  @peer_goaway_id
end

#qpack_decoder_stream_idObject (readonly)

Returns the value of attribute qpack_decoder_stream_id.



14
15
16
# File 'lib/quicsilver/transport/connection.rb', line 14

def qpack_decoder_stream_id
  @qpack_decoder_stream_id
end

#qpack_encoder_stream_idObject (readonly)

Returns the value of attribute qpack_encoder_stream_id.



14
15
16
# File 'lib/quicsilver/transport/connection.rb', line 14

def qpack_encoder_stream_id
  @qpack_encoder_stream_id
end

#server_control_streamObject (readonly)

Returns the value of attribute server_control_stream.



15
16
17
# File 'lib/quicsilver/transport/connection.rb', line 15

def server_control_stream
  @server_control_stream
end

#stream_prioritiesObject (readonly)

Returns the value of attribute stream_priorities.



17
18
19
# File 'lib/quicsilver/transport/connection.rb', line 17

def stream_priorities
  @stream_priorities
end

#streamsObject (readonly)

Returns the value of attribute streams.



13
14
15
# File 'lib/quicsilver/transport/connection.rb', line 13

def streams
  @streams
end

Instance Method Details

#add_stream(stream) ⇒ Object

Stream Management ===



62
63
64
# File 'lib/quicsilver/transport/connection.rb', line 62

def add_stream(stream)
  @streams[stream.stream_id] = stream
end

#apply_stream_priority(stream, priority) ⇒ Object

Apply priority to a QUIC stream via MsQuic. MsQuic: 0 = lowest, 0xFFFF = highest. HTTP urgency: 0 = highest, 7 = lowest. Maps urgency into evenly spaced bands across the uint16 range. The priority is queued and applied on the MsQuic event thread.



274
275
276
277
278
279
280
281
# File 'lib/quicsilver/transport/connection.rb', line 274

def apply_stream_priority(stream, priority)
  handle = stream.respond_to?(:stream_handle) ? stream.stream_handle : nil
  return unless handle
  quic_priority = (7 - priority.urgency) * 0x2000
  Quicsilver.set_stream_priority(handle, quic_priority)
rescue => e
  Quicsilver.logger.debug("Failed to set stream priority: #{e.message}")
end

#buffer_data(stream_id, data) ⇒ Object

Data Handling ===



80
81
82
83
84
# File 'lib/quicsilver/transport/connection.rb', line 80

def buffer_data(stream_id, data)
  @mutex.synchronize do
    (@response_buffers[stream_id] ||= "".b) << data
  end
end

#complete_stream(stream_id, final_data) ⇒ Object



86
87
88
89
90
91
# File 'lib/quicsilver/transport/connection.rb', line 86

def complete_stream(stream_id, final_data)
  @mutex.synchronize do
    buffer = @response_buffers.delete(stream_id)
    (buffer || "".b) + (final_data || "".b)
  end
end

#critical_stream?(stream_id) ⇒ Boolean

Returns:

  • (Boolean)


283
284
285
286
287
# File 'lib/quicsilver/transport/connection.rb', line 283

def critical_stream?(stream_id)
  stream_id == @control_stream_id ||
    stream_id == @qpack_encoder_stream_id ||
    stream_id == @qpack_decoder_stream_id
end

#get_stream(stream_id) ⇒ Object



66
67
68
# File 'lib/quicsilver/transport/connection.rb', line 66

def get_stream(stream_id)
  @streams[stream_id]
end

#handle_unidirectional_stream(stream, fin: true) ⇒ Object



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/quicsilver/transport/connection.rb', line 216

def handle_unidirectional_stream(stream, fin: true)
  stream_id = stream.stream_id

  # Already known as critical stream — closure via FIN is an error
  if fin && critical_stream?(stream_id)
    raise Protocol::FrameError.new("Closure of critical stream", error_code: Protocol::H3_CLOSED_CRITICAL_STREAM)
  end

  data = stream.data
  return if data.empty?

  stream_type, type_len = Protocol.decode_varint(data.bytes, 0)
  return if type_len == 0
  payload = data[type_len..-1]

  case stream_type
  when 0x00
    set_control_stream(stream_id, payload)
    if fin
      raise Protocol::FrameError.new("Closure of critical stream", error_code: Protocol::H3_CLOSED_CRITICAL_STREAM)
    end
  when 0x01
    raise Protocol::FrameError, "Client must not send push streams"
  when 0x02
    raise Protocol::FrameError, "Duplicate QPACK encoder stream" if @qpack_encoder_stream_id
    @qpack_encoder_stream_id = stream_id
    if fin
      raise Protocol::FrameError.new("Closure of critical stream", error_code: Protocol::H3_CLOSED_CRITICAL_STREAM)
    end
  when 0x03
    raise Protocol::FrameError, "Duplicate QPACK decoder stream" if @qpack_decoder_stream_id
    @qpack_decoder_stream_id = stream_id
    if fin
      raise Protocol::FrameError.new("Closure of critical stream", error_code: Protocol::H3_CLOSED_CRITICAL_STREAM)
    end
  end
end

#receive_unidirectional_data(stream_id, data) ⇒ Object

Process incoming data on a unidirectional stream incrementally. Called on each RECEIVE event — control streams never send FIN.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/quicsilver/transport/connection.rb', line 157

def receive_unidirectional_data(stream_id, data)
  @mutex.synchronize do
    (@response_buffers[stream_id] ||= "".b) << data
  end

  buf = @mutex.synchronize { @response_buffers[stream_id] || "".b }
  return if buf.empty?

  # First time seeing this stream: identify stream type
  unless @uni_stream_types&.key?(stream_id)
    @uni_stream_types ||= {}
    stream_type, type_len = Protocol.decode_varint(buf.bytes, 0)
    return if type_len == 0  # need more data

    case stream_type
    when 0x00 # Control stream
      raise Protocol::FrameError, "Duplicate control stream" if @control_stream_id
      @control_stream_id = stream_id
      @uni_stream_types[stream_id] = :control
      # Remove the stream type byte from the buffer
      @mutex.synchronize { @response_buffers[stream_id] = (buf[type_len..] || "".b) }
    when 0x01
      raise Protocol::FrameError, "Client must not send push streams"
    when 0x02 # QPACK encoder stream
      raise Protocol::FrameError, "Duplicate QPACK encoder stream" if @qpack_encoder_stream_id
      @qpack_encoder_stream_id = stream_id
      @uni_stream_types[stream_id] = :qpack_encoder
      @mutex.synchronize { @response_buffers[stream_id] = (buf[type_len..] || "".b) }
    when 0x03 # QPACK decoder stream
      raise Protocol::FrameError, "Duplicate QPACK decoder stream" if @qpack_decoder_stream_id
      @qpack_decoder_stream_id = stream_id
      @uni_stream_types[stream_id] = :qpack_decoder
      @mutex.synchronize { @response_buffers[stream_id] = (buf[type_len..] || "".b) }
    else
      # Unknown unidirectional stream types MUST be ignored (RFC 9114 §6.2)
      @uni_stream_types[stream_id] = :unknown
      return
    end

    buf = @mutex.synchronize { @response_buffers[stream_id] || "".b }
  end

  stream_type = @uni_stream_types[stream_id]
  return if buf.empty?

  case stream_type
  when :control
    parse_control_frames(buf)
    # Clear parsed data from buffer
    @mutex.synchronize { @response_buffers[stream_id] = "".b }
  when :qpack_encoder
    validate_qpack_encoder_data(buf)
    @mutex.synchronize { @response_buffers[stream_id] = "".b }
  when :qpack_decoder
    validate_qpack_decoder_data(buf)
    @mutex.synchronize { @response_buffers[stream_id] = "".b }
  end
end

#remove_stream(stream_id) ⇒ Object



70
71
72
# File 'lib/quicsilver/transport/connection.rb', line 70

def remove_stream(stream_id)
  @streams.delete(stream_id)
end

#send_error(stream, status, message) ⇒ Object



143
144
145
146
147
148
149
150
151
# File 'lib/quicsilver/transport/connection.rb', line 143

def send_error(stream, status, message)
  body = ["#{status} #{message}"]
  encoder = Protocol::ResponseEncoder.new(status, { "content-type" => "text/plain" }, body)
  Quicsilver.send_stream(stream.stream_handle, encoder.encode, true)
rescue RuntimeError => e
  # Stream may have been reset by client - this is expected
  raise unless e.message.include?(MSQUIC_INVALID_STATE) || e.message.include?("StreamSend failed")
  Quicsilver.logger.debug("Stream send failed (client likely reset): #{e.message}")
end

#send_goaway(stream_id = nil) ⇒ Object

HTTP/3 Frames ===



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/quicsilver/transport/connection.rb', line 95

def send_goaway(stream_id = nil)
  return unless @server_control_stream

  stream_id ||= last_client_stream_id
  validate_goaway_id!(stream_id)

  @server_control_stream.send(Protocol.build_goaway_frame(stream_id))
  @local_goaway_id = stream_id
rescue ArgumentError
  raise  # Re-raise validation errors
rescue => e
  Quicsilver.logger.error("Failed to send GOAWAY: #{e.message}")
end

#send_informational(stream, status, headers) ⇒ Object

Send an informational (1xx) response before the final response. RFC 9114 §4.1: encoded as a HEADERS frame, no FIN.



118
119
120
121
122
123
124
# File 'lib/quicsilver/transport/connection.rb', line 118

def send_informational(stream, status, headers)
  data = Protocol::ResponseEncoder.encode_informational(status, headers)
  Quicsilver.send_stream(stream.stream_handle, data, false)
rescue RuntimeError => e
  raise unless e.message.include?(MSQUIC_INVALID_STATE) || e.message.include?("StreamSend failed")
  Quicsilver.logger.debug("Stream send failed (client likely reset): #{e.message}")
end

#send_response(stream, status, headers, body, head_request: false, trailers: nil) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/quicsilver/transport/connection.rb', line 126

def send_response(stream, status, headers, body, head_request: false, trailers: nil)
  body = [] if body.nil?
  encoder = Protocol::ResponseEncoder.new(status, headers, body, head_request: head_request, trailers: trailers)

  if body.respond_to?(:to_ary)
    Quicsilver.send_stream(stream.stream_handle, encoder.encode, true)
  else
    encoder.stream_encode do |frame_data, fin|
      Quicsilver.send_stream(stream.stream_handle, frame_data, fin) unless frame_data.empty? && !fin
    end
  end
rescue RuntimeError => e
  # Stream may have been reset by client - this is expected
  raise unless e.message.include?(MSQUIC_INVALID_STATE) || e.message.include?("StreamSend failed")
  Quicsilver.logger.debug("Stream send failed (client likely reset): #{e.message}")
end

#set_control_stream(stream_id, payload = nil) ⇒ Object



254
255
256
257
258
# File 'lib/quicsilver/transport/connection.rb', line 254

def set_control_stream(stream_id, payload = nil)
  raise Protocol::FrameError, "Duplicate control stream" if @control_stream_id
  @control_stream_id = stream_id
  parse_control_frames(payload) if payload && !payload.empty?
end

#settingsObject



260
261
262
# File 'lib/quicsilver/transport/connection.rb', line 260

def settings
  @settings
end

#setup_http3_streamsObject

Setup (called after connection established) ===



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/quicsilver/transport/connection.rb', line 44

def setup_http3_streams
  # Control stream (required)
  @server_control_stream = open_stream(unidirectional: true)
  @server_control_stream.send(Protocol.build_control_stream(max_field_section_size: @max_header_size))

  # QPACK encoder/decoder streams
  [0x02, 0x03].each do |type|
    stream = open_stream(unidirectional: true)
    stream.send([type].pack("C"))
  end

  # GREASE unidirectional stream (RFC 9297)
  stream = open_stream(unidirectional: true)
  stream.send(Protocol.encode_varint(Protocol.grease_id) + "GREASE".b)
end

#shutdown(error_code = 0) ⇒ Object

Shutdown ===



291
292
293
294
# File 'lib/quicsilver/transport/connection.rb', line 291

def shutdown(error_code = 0)
  send_goaway
  Quicsilver.connection_shutdown(@handle, error_code, false)
end

#stream_priority(stream_id) ⇒ Object

Get the priority for a stream. Returns default Priority if not set.



265
266
267
# File 'lib/quicsilver/transport/connection.rb', line 265

def stream_priority(stream_id)
  @stream_priorities[stream_id] || Protocol::Priority.new
end

#track_client_stream(stream_id) ⇒ Object



74
75
76
# File 'lib/quicsilver/transport/connection.rb', line 74

def track_client_stream(stream_id)
  @streams[stream_id] = true
end

#validate_goaway_id!(stream_id) ⇒ Object

RFC 9114 §5.2: GOAWAY IDs MUST NOT increase from a previous value.



110
111
112
113
114
# File 'lib/quicsilver/transport/connection.rb', line 110

def validate_goaway_id!(stream_id)
  if @local_goaway_id && stream_id > @local_goaway_id
    raise ArgumentError, "GOAWAY stream ID #{stream_id} exceeds previous #{@local_goaway_id}"
  end
end