Class: Biryani::Connection
- Inherits:
-
Object
- Object
- Biryani::Connection
- Defined in:
- lib/biryani/connection.rb
Overview
rubocop: disable Metrics/ClassLength
Class Method Summary collapse
- .default_settings ⇒ Hash<Integer, Integer>
- .do_send(io, frame, flush) ⇒ Object
- .handle_connection_window_update(window_update, send_window) ⇒ nil, ConnectionError
-
.handle_data(stream_id, data, recv_window, streams_ctx, decoder) ⇒ Array<WindowUpdate>, ConnectionError
rubocop: disable Metrics/AbcSize.
- .handle_goaway(_goaway) ⇒ Object
- .handle_headers(headers, ctx, decoder) ⇒ nil, ConnectionError
- .handle_ping(ping) ⇒ Ping?
- .handle_rst_stream(_rst_stream, ctx) ⇒ Object
- .handle_settings(settings, peer_settings, decoder, streams_ctx) ⇒ Settings
- .handle_stream_window_update(window_update, streams_ctx) ⇒ nil, StreamError
- .http_request(fragment, content, decoder) ⇒ HTTP::Request, ConnectionError
- .http_response(res, encoder) ⇒ String
- .read_http2_magic(io) ⇒ nil, Error
- .send_data(io, stream_id, data, send_window, max_frame_size, streams_ctx, data_buffer) ⇒ Object
- .send_headers(io, stream_id, fragment, only_headers, max_frame_size, streams_ctx) ⇒ Object
-
.transition_stream_state_recv(recv_frame, streams_ctx, data_buffer, stream_id, max_streams, send_initial_window_size, recv_initial_window_size) ⇒ StreamContext, ...
rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/PerceivedComplexity.
-
.transition_stream_state_send(send_frame, streams_ctx) ⇒ Boolean
Should close connection?.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#do_recv_dispatch(frame) ⇒ Array<Object>, ...
Frames or errors.
-
#handle_connection_frame(frame) ⇒ Array<Object>, ...
rubocop: disable Metrics/CyclomaticComplexity.
- #handle_response(io, res, stream_id) ⇒ Object
-
#handle_stream_frame(frame) ⇒ Array<Object>, ...
rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/MethodLength.
-
#initialize(proc) ⇒ Connection
constructor
proc [Proc].
-
#recv_dispatch(io, obj) ⇒ Array<Object>, ...
rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/PerceivedComplexity.
- #recv_loop(io) ⇒ Object
- #select_loop(io) ⇒ Object
- #serve(io) ⇒ Object
Constructor Details
#initialize(proc) ⇒ Connection
proc [Proc]
21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/biryani/connection.rb', line 21 def initialize(proc) @sock = nil # Ractor::Port @proc = proc @streams_ctx = StreamsContext.new(proc) @encoder = HPACK::Encoder.new(4_096) @decoder = HPACK::Decoder.new(4_096) @send_window = Window.new(65_535) @recv_window = Window.new(65_535) @data_buffer = DataBuffer.new @settings = self.class.default_settings # Hash<Integer, Integer> @peer_settings = self.class.default_settings # Hash<Integer, Integer> @closed = false end |
Class Method Details
.default_settings ⇒ Hash<Integer, Integer>
486 487 488 489 490 491 492 493 494 495 496 |
# File 'lib/biryani/connection.rb', line 486 def self.default_settings # https://datatracker.ietf.org/doc/html/rfc9113#section-6.5.2 { SettingsID::SETTINGS_HEADER_TABLE_SIZE => 4_096, SettingsID::SETTINGS_ENABLE_PUSH => 1, SettingsID::SETTINGS_MAX_CONCURRENT_STREAMS => 0xffffffff, SettingsID::SETTINGS_INITIAL_WINDOW_SIZE => 65_535, SettingsID::SETTINGS_MAX_FRAME_SIZE => 16_384, SettingsID::SETTINGS_MAX_HEADER_LIST_SIZE => 0xffffffff } end |
.do_send(io, frame, flush) ⇒ Object
299 300 301 302 |
# File 'lib/biryani/connection.rb', line 299 def self.do_send(io, frame, flush) io.write(frame.to_binary_s) io.flush if flush end |
.handle_connection_window_update(window_update, send_window) ⇒ nil, ConnectionError
440 441 442 443 444 445 |
# File 'lib/biryani/connection.rb', line 440 def self.handle_connection_window_update(window_update, send_window) send_window.increase!(window_update.window_size_increment) return ConnectionError.new(ErrorCode::FLOW_CONTROL_ERROR, 'flow-control window exceeds 2^31-1') if send_window.length > 2**31 - 1 nil end |
.handle_data(stream_id, data, recv_window, streams_ctx, decoder) ⇒ Array<WindowUpdate>, ConnectionError
rubocop: disable Metrics/AbcSize
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/biryani/connection.rb', line 363 def self.handle_data(stream_id, data, recv_window, streams_ctx, decoder) ctx = streams_ctx[stream_id] return ConnectionError.new(ErrorCode::FLOW_CONTROL_ERROR, 'DATA Frame length exceeds flow-control window size') \ if recv_window.consume!(data.bytesize).negative? || ctx.recv_window.consume!(data.bytesize).negative? ctx.content << data if ctx.half_closed_remote? obj = http_request(ctx.fragment, ctx.content, decoder) return obj if Biryani.err?(obj) ctx << obj end window_updates = [] window_updates << Frame::WindowUpdate.new(0, recv_window.capacity - recv_window.length) if recv_window.length < recv_window.capacity / 2 window_updates << Frame::WindowUpdate.new(stream_id, ctx.recv_window.capacity - ctx.recv_window.length) if ctx.recv_window.length < ctx.recv_window.capacity / 2 window_updates end |
.handle_goaway(_goaway) ⇒ Object
434 |
# File 'lib/biryani/connection.rb', line 434 def self.handle_goaway(_goaway); end |
.handle_headers(headers, ctx, decoder) ⇒ nil, ConnectionError
388 389 390 391 392 393 394 395 396 397 398 |
# File 'lib/biryani/connection.rb', line 388 def self.handle_headers(headers, ctx, decoder) ctx.fragment << headers.fragment if ctx.half_closed_remote? obj = http_request(ctx.fragment, ctx.content, decoder) return obj if Biryani.err?(obj) ctx << obj end nil end |
.handle_ping(ping) ⇒ Ping?
429 430 431 |
# File 'lib/biryani/connection.rb', line 429 def self.handle_ping(ping) Frame::Ping.new(true, 0, ping.opaque) unless ping.ack? end |
.handle_rst_stream(_rst_stream, ctx) ⇒ Object
402 403 404 |
# File 'lib/biryani/connection.rb', line 402 def self.handle_rst_stream(_rst_stream, ctx) ctx.close end |
.handle_settings(settings, peer_settings, decoder, streams_ctx) ⇒ Settings
412 413 414 415 416 417 418 419 420 421 422 423 424 |
# File 'lib/biryani/connection.rb', line 412 def self.handle_settings(settings, peer_settings, decoder, streams_ctx) return nil if settings.ack? peer_settings.merge!(settings.setting) new_limit = peer_settings[SettingsID::SETTINGS_HEADER_TABLE_SIZE] decoder.limit!(new_limit) send_initial_window_size = peer_settings[SettingsID::SETTINGS_INITIAL_WINDOW_SIZE] streams_ctx.each do |ctx| ctx.send_window.update!(send_initial_window_size) end Frame::Settings.new(true, 0, {}) end |
.handle_stream_window_update(window_update, streams_ctx) ⇒ nil, StreamError
451 452 453 454 455 456 457 |
# File 'lib/biryani/connection.rb', line 451 def self.handle_stream_window_update(window_update, streams_ctx) stream_id = window_update.stream_id streams_ctx[stream_id].send_window.increase!(window_update.window_size_increment) return StreamError.new(ErrorCode::FLOW_CONTROL_ERROR, stream_id, 'flow-control window exceeds 2^31-1') if streams_ctx[stream_id].send_window.length > 2**31 - 1 nil end |
.http_request(fragment, content, decoder) ⇒ HTTP::Request, ConnectionError
464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/biryani/connection.rb', line 464 def self.http_request(fragment, content, decoder) obj = decoder.decode(fragment) return obj if Biryani.err?(obj) fields = obj builder = HTTP::RequestBuilder.new err = builder.fields(fields) return err unless err.nil? builder.build(content) end |
.http_response(res, encoder) ⇒ String
481 482 483 |
# File 'lib/biryani/connection.rb', line 481 def self.http_response(res, encoder) HTTP::ResponseParser.new(res).parse(encoder) end |
.read_http2_magic(io) ⇒ nil, Error
350 351 352 353 |
# File 'lib/biryani/connection.rb', line 350 def self.read_http2_magic(io) s = io.read(CONNECTION_PREFACE_LENGTH) ConnectionError.new(ErrorCode::PROTOCOL_ERROR, 'invalid connection preface') if s != CONNECTION_PREFACE end |
.send_data(io, stream_id, data, send_window, max_frame_size, streams_ctx, data_buffer) ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/biryani/connection.rb', line 311 def self.send_data(io, stream_id, data, send_window, max_frame_size, streams_ctx, data_buffer) frames, remains = streams_ctx.sendable_datas(stream_id, data, send_window, max_frame_size) frames.each do |frame| do_send(io, frame, false) send_window.consume!(frame.length) streams_ctx[stream_id].send_window.consume!(frame.length) transition_stream_state_send(frame, streams_ctx) end data_buffer.store(stream_id, remains) unless remains.empty? end |
.send_headers(io, stream_id, fragment, only_headers, max_frame_size, streams_ctx) ⇒ Object
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/biryani/connection.rb', line 330 def self.send_headers(io, stream_id, fragment, only_headers, max_frame_size, streams_ctx) len = (fragment.bytesize + max_frame_size - 1) / max_frame_size frames = fragment.gsub(/.{1,#{max_frame_size}}/m).with_index.map do |s, index| end_headers = index == len - 1 if index.zero? Frame::Headers.new(end_headers, only_headers, stream_id, nil, nil, s, nil) else Frame::Continuation.new(end_headers, stream_id, s) end end frames.each do |frame| do_send(io, frame, false) transition_stream_state_send(frame, streams_ctx) end end |
.transition_stream_state_recv(recv_frame, streams_ctx, data_buffer, stream_id, max_streams, send_initial_window_size, recv_initial_window_size) ⇒ StreamContext, ...
rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/PerceivedComplexity
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/biryani/connection.rb', line 257 def self.transition_stream_state_recv(recv_frame, streams_ctx, data_buffer, stream_id, max_streams, send_initial_window_size, recv_initial_window_size) ctx = streams_ctx[stream_id] return StreamError.new(ErrorCode::PROTOCOL_ERROR, stream_id, 'exceed max concurrent streams') if ctx.nil? && streams_ctx.count_active + 1 > max_streams return ConnectionError.new(ErrorCode::PROTOCOL_ERROR, 'even-numbered stream identifier') if ctx.nil? && stream_id.even? return ConnectionError.new(ErrorCode::PROTOCOL_ERROR, 'new stream identifier is less than the existing stream identifiers') if ctx.nil? && streams_ctx.last_stream_id > stream_id if ctx.nil? ctx = streams_ctx.new_context(stream_id, send_initial_window_size, recv_initial_window_size) # An ideal implementation would wait the RTT before removing the stream. streams_ctx.remove_closed(data_buffer) end obj = ctx.state_transition!(recv_frame, :recv) return obj if Biryani.err?(obj) ctx end |
.transition_stream_state_send(send_frame, streams_ctx) ⇒ Boolean
Returns should close connection?.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/biryani/connection.rb', line 281 def self.transition_stream_state_send(send_frame, streams_ctx) stream_id = send_frame.stream_id typ = send_frame.f_type case typ when FrameType::SETTINGS, FrameType::PING false when FrameType::GOAWAY streams_ctx.close_all true else streams_ctx[stream_id].state_transition!(send_frame, :send) unless stream_id.zero? false end end |
Instance Method Details
#close ⇒ Object
237 238 239 |
# File 'lib/biryani/connection.rb', line 237 def close @closed = true end |
#closed? ⇒ Boolean
242 243 244 |
# File 'lib/biryani/connection.rb', line 242 def closed? @closed end |
#do_recv_dispatch(frame) ⇒ Array<Object>, ...
Returns frames or errors.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/biryani/connection.rb', line 119 def do_recv_dispatch(frame) receiving_continuation_stream_id = @streams_ctx.receiving_continuation_stream_id return [ConnectionError.new(ErrorCode::PROTOCOL_ERROR, "invalid frame type #{format('0x%02x', typ)} for stream identifier #{format('0x%02x', stream_id)}")] \ if !receiving_continuation_stream_id.nil? && frame.stream_id != receiving_continuation_stream_id if frame.stream_id.zero? handle_connection_frame(frame) else handle_stream_frame(frame) end end |
#handle_connection_frame(frame) ⇒ Array<Object>, ...
rubocop: disable Metrics/CyclomaticComplexity
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/biryani/connection.rb', line 135 def handle_connection_frame(frame) case frame.f_type when FrameType::DATA, FrameType::HEADERS, FrameType::PRIORITY, FrameType::RST_STREAM, FrameType::PUSH_PROMISE, FrameType::CONTINUATION [ConnectionError.new(ErrorCode::PROTOCOL_ERROR, "invalid frame type #{format('0x%02x', typ)} for stream identifier 0x00")] when FrameType::SETTINGS obj = self.class.handle_settings(frame, @peer_settings, @decoder, @streams_ctx) return [] if obj.nil? settings_ack = obj [settings_ack] when FrameType::PING obj = self.class.handle_ping(frame) return [] if obj.nil? ping_ack = obj [ping_ack] when FrameType::GOAWAY self.class.handle_goaway(frame) [] when FrameType::WINDOW_UPDATE err = self.class.handle_connection_window_update(frame, @send_window) return [err] unless err.nil? max_frame_size = @peer_settings[SettingsID::SETTINGS_MAX_FRAME_SIZE] @data_buffer.take!(@send_window, @streams_ctx, max_frame_size) # return DATA Frames else # ignore UNKNOWN Frame [] end end |
#handle_response(io, res, stream_id) ⇒ Object
230 231 232 233 234 235 |
# File 'lib/biryani/connection.rb', line 230 def handle_response(io, res, stream_id) fragment, data = self.class.http_response(res, @encoder) max_frame_size = @peer_settings[SettingsID::SETTINGS_MAX_FRAME_SIZE] self.class.send_headers(io, stream_id, fragment, data.empty?, max_frame_size, @streams_ctx) self.class.send_data(io, stream_id, data, @send_window, max_frame_size, @streams_ctx, @data_buffer) unless data.empty? end |
#handle_stream_frame(frame) ⇒ Array<Object>, ...
rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/MethodLength
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/biryani/connection.rb', line 175 def handle_stream_frame(frame) stream_id = frame.stream_id typ = frame.f_type return [ConnectionError.new(ErrorCode::PROTOCOL_ERROR, "invalid frame type #{format('0x%02x', typ)} for stream identifier #{format('0x%02x', stream_id)}")] \ if [FrameType::SETTINGS, FrameType::PING, FrameType::GOAWAY].include?(typ) max_streams = @peer_settings[SettingsID::SETTINGS_MAX_CONCURRENT_STREAMS] send_initial_window_size = @peer_settings[SettingsID::SETTINGS_INITIAL_WINDOW_SIZE] recv_initial_window_size = @settings[SettingsID::SETTINGS_INITIAL_WINDOW_SIZE] obj = self.class.transition_stream_state_recv(frame, @streams_ctx, @data_buffer, stream_id, max_streams, send_initial_window_size, recv_initial_window_size) return [obj] if Biryani.err?(obj) ctx = obj case typ when FrameType::DATA obj = self.class.handle_data(stream_id, frame.data, @recv_window, @streams_ctx, @decoder) return [obj] if Biryani.err?(obj) obj # return WINDOW_UPDATE Frames when FrameType::HEADERS, FrameType::CONTINUATION err = self.class.handle_headers(frame, ctx, @decoder) return [err] unless err.nil? [] when FrameType::PRIORITY # ignore PRIORITY Frame [] when FrameType::PUSH_PROMISE # TODO [] when FrameType::RST_STREAM self.class.handle_rst_stream(frame, ctx) [] when FrameType::WINDOW_UPDATE err = self.class.handle_stream_window_update(frame, @streams_ctx) return [err] unless err.nil? max_frame_size = @peer_settings[SettingsID::SETTINGS_MAX_FRAME_SIZE] @data_buffer.take!(@send_window, @streams_ctx, max_frame_size) else # ignore UNKNOWN Frame [] end end |
#recv_dispatch(io, obj) ⇒ Array<Object>, ...
rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/PerceivedComplexity
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/biryani/connection.rb', line 90 def recv_dispatch(io, obj) if Biryani.err?(obj) reply_frame = Biryani.unwrap(obj, @streams_ctx.last_stream_id) self.class.do_send(io, reply_frame, true) close if self.class.transition_stream_state_send(reply_frame, @streams_ctx) elsif obj.length > @settings[SettingsID::SETTINGS_MAX_FRAME_SIZE] self.class.do_send(io, Frame::Goaway.new(@streams_ctx.last_stream_id, ErrorCode::FRAME_SIZE_ERROR, 'payload length greater than SETTINGS_MAX_FRAME_SIZE'), true) close else do_recv_dispatch(obj).each do |frame| reply_frame = Biryani.unwrap(frame, @streams_ctx.last_stream_id) self.class.do_send(io, reply_frame, true) if reply_frame.f_type == FrameType::WINDOW_UPDATE && reply_frame.stream_id.zero? @recv_window.increase!(reply_frame.window_size_increment) elsif reply_frame.f_type == FrameType::WINDOW_UPDATE @streams_ctx[reply_frame.stream_id].recv_window.increase!(reply_frame.window_size_increment) end close if self.class.transition_stream_state_send(reply_frame, @streams_ctx) end end end |
#recv_loop(io) ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/biryani/connection.rb', line 55 def recv_loop(io) Ractor.new(io, @sock = Ractor::Port.new) do |io_, sock_| loop do obj = Frame.read(io_) break if obj.nil? sock_.send(obj, move: true) end end end |
#select_loop(io) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/biryani/connection.rb', line 67 def select_loop(io) loop do break if @sock.closed? && @streams_ctx.empty? port, obj = Ractor.select(@sock, @streams_ctx.tx) if port == @sock recv_dispatch(io, obj) else res, stream_id = obj handle_response(io, res, stream_id) end break if closed? end end |
#serve(io) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/biryani/connection.rb', line 36 def serve(io) err = self.class.read_http2_magic(io) unless err.nil? self.class.do_send(io, err.goaway(@streams_ctx.last_stream_id), true) return end self.class.do_send(io, Frame::Settings.new(false, 0, {}), true) recv_loop(io.clone) select_loop(io) rescue StandardError => e puts e.backtrace self.class.do_send(io, Frame::Goaway.new(@streams_ctx.last_stream_id, ErrorCode::INTERNAL_ERROR, 'internal error'), true) ensure io.close_write end |