Class: Biryani::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/biryani/connection.rb

Overview

rubocop: disable Metrics/ClassLength

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(proc) ⇒ Connection

proc [Proc]



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/biryani/connection.rb', line 21

def initialize(proc)
  @sock = nil # Ractor::Port
  @proc = proc
  @streams_ctx = StreamsContext.new(proc)
  @encoder = HPACK::Encoder.new(4_096)
  @decoder = HPACK::Decoder.new(4_096)
  @send_window = Window.new(65_535)
  @recv_window = Window.new(65_535)
  @data_buffer = DataBuffer.new
  @settings = self.class.default_settings # Hash<Integer, Integer>
  @peer_settings = self.class.default_settings # Hash<Integer, Integer>
  @closed = false
end

Class Method Details

.default_settingsHash<Integer, Integer>

Returns:

  • (Hash<Integer, Integer>)


486
487
488
489
490
491
492
493
494
495
496
# File 'lib/biryani/connection.rb', line 486

def self.default_settings
  # https://datatracker.ietf.org/doc/html/rfc9113#section-6.5.2
  {
    SettingsID::SETTINGS_HEADER_TABLE_SIZE => 4_096,
    SettingsID::SETTINGS_ENABLE_PUSH => 1,
    SettingsID::SETTINGS_MAX_CONCURRENT_STREAMS => 0xffffffff,
    SettingsID::SETTINGS_INITIAL_WINDOW_SIZE => 65_535,
    SettingsID::SETTINGS_MAX_FRAME_SIZE => 16_384,
    SettingsID::SETTINGS_MAX_HEADER_LIST_SIZE => 0xffffffff
  }
end

.do_send(io, frame, flush) ⇒ Object

Parameters:

  • io (IO)
  • frame (Object)
  • flush (Boolean)


299
300
301
302
# File 'lib/biryani/connection.rb', line 299

def self.do_send(io, frame, flush)
  io.write(frame.to_binary_s)
  io.flush if flush
end

.handle_connection_window_update(window_update, send_window) ⇒ nil, ConnectionError

Parameters:

  • window_update (WindowUpdate)
  • send_window (Window)

Returns:



440
441
442
443
444
445
# File 'lib/biryani/connection.rb', line 440

def self.handle_connection_window_update(window_update, send_window)
  send_window.increase!(window_update.window_size_increment)
  return ConnectionError.new(ErrorCode::FLOW_CONTROL_ERROR, 'flow-control window exceeds 2^31-1') if send_window.length > 2**31 - 1

  nil
end

.handle_data(stream_id, data, recv_window, streams_ctx, decoder) ⇒ Array<WindowUpdate>, ConnectionError

rubocop: disable Metrics/AbcSize

Parameters:

  • stream_id (Integer)
  • data (String)
  • recv_window (Window)
  • streams_ctx (StreamsContext)
  • decoder (Decoder)

Returns:



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/biryani/connection.rb', line 363

def self.handle_data(stream_id, data, recv_window, streams_ctx, decoder)
  ctx = streams_ctx[stream_id]
  return ConnectionError.new(ErrorCode::FLOW_CONTROL_ERROR, 'DATA Frame length exceeds flow-control window size') \
    if recv_window.consume!(data.bytesize).negative? || ctx.recv_window.consume!(data.bytesize).negative?

  ctx.content << data
  if ctx.half_closed_remote?
    obj = http_request(ctx.fragment, ctx.content, decoder)
    return obj if Biryani.err?(obj)

    ctx << obj
  end

  window_updates = []
  window_updates << Frame::WindowUpdate.new(0, recv_window.capacity - recv_window.length) if recv_window.length < recv_window.capacity / 2
  window_updates << Frame::WindowUpdate.new(stream_id, ctx.recv_window.capacity - ctx.recv_window.length) if ctx.recv_window.length < ctx.recv_window.capacity / 2
  window_updates
end

.handle_goaway(_goaway) ⇒ Object

Parameters:

  • _goaway (Goaway)


434
# File 'lib/biryani/connection.rb', line 434

def self.handle_goaway(_goaway); end

.handle_headers(headers, ctx, decoder) ⇒ nil, ConnectionError

Parameters:

Returns:



388
389
390
391
392
393
394
395
396
397
398
# File 'lib/biryani/connection.rb', line 388

def self.handle_headers(headers, ctx, decoder)
  ctx.fragment << headers.fragment
  if ctx.half_closed_remote?
    obj = http_request(ctx.fragment, ctx.content, decoder)
    return obj if Biryani.err?(obj)

    ctx << obj
  end

  nil
end

.handle_ping(ping) ⇒ Ping?

Parameters:

  • ping (Ping)

Returns:

  • (Ping, nil)


429
430
431
# File 'lib/biryani/connection.rb', line 429

def self.handle_ping(ping)
  Frame::Ping.new(true, 0, ping.opaque) unless ping.ack?
end

.handle_rst_stream(_rst_stream, ctx) ⇒ Object

Parameters:



402
403
404
# File 'lib/biryani/connection.rb', line 402

def self.handle_rst_stream(_rst_stream, ctx)
  ctx.close
end

.handle_settings(settings, peer_settings, decoder, streams_ctx) ⇒ Settings

Parameters:

  • settings (Settings)
  • peer_settings (Hash<Integer, Integer>)
  • decoder (Decoder)
  • streams_ctx (StreamsContext)

Returns:

  • (Settings)


412
413
414
415
416
417
418
419
420
421
422
423
424
# File 'lib/biryani/connection.rb', line 412

def self.handle_settings(settings, peer_settings, decoder, streams_ctx)
  return nil if settings.ack?

  peer_settings.merge!(settings.setting)
  new_limit = peer_settings[SettingsID::SETTINGS_HEADER_TABLE_SIZE]
  decoder.limit!(new_limit)
  send_initial_window_size = peer_settings[SettingsID::SETTINGS_INITIAL_WINDOW_SIZE]
  streams_ctx.each do |ctx|
    ctx.send_window.update!(send_initial_window_size)
  end

  Frame::Settings.new(true, 0, {})
end

.handle_stream_window_update(window_update, streams_ctx) ⇒ nil, StreamError

Parameters:

Returns:



451
452
453
454
455
456
457
# File 'lib/biryani/connection.rb', line 451

def self.handle_stream_window_update(window_update, streams_ctx)
  stream_id = window_update.stream_id
  streams_ctx[stream_id].send_window.increase!(window_update.window_size_increment)
  return StreamError.new(ErrorCode::FLOW_CONTROL_ERROR, stream_id, 'flow-control window exceeds 2^31-1') if streams_ctx[stream_id].send_window.length > 2**31 - 1

  nil
end

.http_request(fragment, content, decoder) ⇒ HTTP::Request, ConnectionError

Parameters:

  • fragment (String)
  • content (String)
  • decoder (Decoder)

Returns:



464
465
466
467
468
469
470
471
472
473
474
# File 'lib/biryani/connection.rb', line 464

def self.http_request(fragment, content, decoder)
  obj = decoder.decode(fragment)
  return obj if Biryani.err?(obj)

  fields = obj
  builder = HTTP::RequestBuilder.new
  err = builder.fields(fields)
  return err unless err.nil?

  builder.build(content)
end

.http_response(res, encoder) ⇒ String

Parameters:

Returns:

  • (String)

    fragment

  • (String)

    data



481
482
483
# File 'lib/biryani/connection.rb', line 481

def self.http_response(res, encoder)
  HTTP::ResponseParser.new(res).parse(encoder)
end

.read_http2_magic(io) ⇒ nil, Error

Parameters:

  • io (IO)

Returns:

  • (nil, Error)


350
351
352
353
# File 'lib/biryani/connection.rb', line 350

def self.read_http2_magic(io)
  s = io.read(CONNECTION_PREFACE_LENGTH)
  ConnectionError.new(ErrorCode::PROTOCOL_ERROR, 'invalid connection preface') if s != CONNECTION_PREFACE
end

.send_data(io, stream_id, data, send_window, max_frame_size, streams_ctx, data_buffer) ⇒ Object

Parameters:



311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/biryani/connection.rb', line 311

def self.send_data(io, stream_id, data, send_window, max_frame_size, streams_ctx, data_buffer)
  frames, remains = streams_ctx.sendable_datas(stream_id, data, send_window, max_frame_size)

  frames.each do |frame|
    do_send(io, frame, false)
    send_window.consume!(frame.length)
    streams_ctx[stream_id].send_window.consume!(frame.length)
    transition_stream_state_send(frame, streams_ctx)
  end

  data_buffer.store(stream_id, remains) unless remains.empty?
end

.send_headers(io, stream_id, fragment, only_headers, max_frame_size, streams_ctx) ⇒ Object

Parameters:

  • io (IO)
  • stream_id (Integer)
  • fragment (String)
  • only_headers (Boolean)
  • max_frame_size (Integer)
  • streams_ctx (StreamsContext)


330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/biryani/connection.rb', line 330

def self.send_headers(io, stream_id, fragment, only_headers, max_frame_size, streams_ctx)
  len = (fragment.bytesize + max_frame_size - 1) / max_frame_size
  frames = fragment.gsub(/.{1,#{max_frame_size}}/m).with_index.map do |s, index|
    end_headers = index == len - 1
    if index.zero?
      Frame::Headers.new(end_headers, only_headers, stream_id, nil, nil, s, nil)
    else
      Frame::Continuation.new(end_headers, stream_id, s)
    end
  end

  frames.each do |frame|
    do_send(io, frame, false)
    transition_stream_state_send(frame, streams_ctx)
  end
end

.transition_stream_state_recv(recv_frame, streams_ctx, data_buffer, stream_id, max_streams, send_initial_window_size, recv_initial_window_size) ⇒ StreamContext, ...

rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/PerceivedComplexity

Parameters:

  • recv_frame (Object)
  • streams_ctx (StreamsContext)
  • stream_id (Integer)
  • data_buffer (DataBuffer)
  • max_streams (Integer)
  • send_initial_window_size (Integer)
  • recv_initial_window_size (Integer)

Returns:



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/biryani/connection.rb', line 257

def self.transition_stream_state_recv(recv_frame, streams_ctx, data_buffer, stream_id, max_streams, send_initial_window_size, recv_initial_window_size)
  ctx = streams_ctx[stream_id]
  return StreamError.new(ErrorCode::PROTOCOL_ERROR, stream_id, 'exceed max concurrent streams') if ctx.nil? && streams_ctx.count_active + 1 > max_streams
  return ConnectionError.new(ErrorCode::PROTOCOL_ERROR, 'even-numbered stream identifier') if ctx.nil? && stream_id.even?
  return ConnectionError.new(ErrorCode::PROTOCOL_ERROR, 'new stream identifier is less than the existing stream identifiers') if ctx.nil? && streams_ctx.last_stream_id > stream_id

  if ctx.nil?
    ctx = streams_ctx.new_context(stream_id, send_initial_window_size, recv_initial_window_size)
    # An ideal implementation would wait the RTT before removing the stream.
    streams_ctx.remove_closed(data_buffer)
  end

  obj = ctx.state_transition!(recv_frame, :recv)
  return obj if Biryani.err?(obj)

  ctx
end

.transition_stream_state_send(send_frame, streams_ctx) ⇒ Boolean

Returns should close connection?.

Parameters:

Returns:

  • (Boolean)

    should close connection?



281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/biryani/connection.rb', line 281

def self.transition_stream_state_send(send_frame, streams_ctx)
  stream_id = send_frame.stream_id
  typ = send_frame.f_type
  case typ
  when FrameType::SETTINGS, FrameType::PING
    false
  when FrameType::GOAWAY
    streams_ctx.close_all
    true
  else
    streams_ctx[stream_id].state_transition!(send_frame, :send) unless stream_id.zero?
    false
  end
end

Instance Method Details

#closeObject



237
238
239
# File 'lib/biryani/connection.rb', line 237

def close
  @closed = true
end

#closed?Boolean

Returns:

  • (Boolean)


242
243
244
# File 'lib/biryani/connection.rb', line 242

def closed?
  @closed
end

#do_recv_dispatch(frame) ⇒ Array<Object>, ...

Returns frames or errors.

Parameters:

  • frame (Object)

Returns:



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/biryani/connection.rb', line 119

def do_recv_dispatch(frame)
  receiving_continuation_stream_id = @streams_ctx.receiving_continuation_stream_id
  return [ConnectionError.new(ErrorCode::PROTOCOL_ERROR, "invalid frame type #{format('0x%02x', typ)} for stream identifier #{format('0x%02x', stream_id)}")] \
    if !receiving_continuation_stream_id.nil? && frame.stream_id != receiving_continuation_stream_id

  if frame.stream_id.zero?
    handle_connection_frame(frame)
  else
    handle_stream_frame(frame)
  end
end

#handle_connection_frame(frame) ⇒ Array<Object>, ...

rubocop: disable Metrics/CyclomaticComplexity

Parameters:

  • frame (Object)

Returns:



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/biryani/connection.rb', line 135

def handle_connection_frame(frame)
  case frame.f_type
  when FrameType::DATA, FrameType::HEADERS, FrameType::PRIORITY, FrameType::RST_STREAM, FrameType::PUSH_PROMISE, FrameType::CONTINUATION
    [ConnectionError.new(ErrorCode::PROTOCOL_ERROR, "invalid frame type #{format('0x%02x', typ)} for stream identifier 0x00")]
  when FrameType::SETTINGS
    obj = self.class.handle_settings(frame, @peer_settings, @decoder, @streams_ctx)
    return [] if obj.nil?

    settings_ack = obj
    [settings_ack]
  when FrameType::PING
    obj = self.class.handle_ping(frame)
    return [] if obj.nil?

    ping_ack = obj
    [ping_ack]
  when FrameType::GOAWAY
    self.class.handle_goaway(frame)

    []
  when FrameType::WINDOW_UPDATE
    err = self.class.handle_connection_window_update(frame, @send_window)
    return [err] unless err.nil?

    max_frame_size = @peer_settings[SettingsID::SETTINGS_MAX_FRAME_SIZE]
    @data_buffer.take!(@send_window, @streams_ctx, max_frame_size) # return DATA Frames
  else
    # ignore UNKNOWN Frame

    []
  end
end

#handle_response(io, res, stream_id) ⇒ Object

Parameters:



230
231
232
233
234
235
# File 'lib/biryani/connection.rb', line 230

def handle_response(io, res, stream_id)
  fragment, data = self.class.http_response(res, @encoder)
  max_frame_size = @peer_settings[SettingsID::SETTINGS_MAX_FRAME_SIZE]
  self.class.send_headers(io, stream_id, fragment, data.empty?, max_frame_size, @streams_ctx)
  self.class.send_data(io, stream_id, data, @send_window, max_frame_size, @streams_ctx, @data_buffer) unless data.empty?
end

#handle_stream_frame(frame) ⇒ Array<Object>, ...

rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/MethodLength

Parameters:

  • frame (Object)

Returns:



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/biryani/connection.rb', line 175

def handle_stream_frame(frame)
  stream_id = frame.stream_id
  typ = frame.f_type
  return [ConnectionError.new(ErrorCode::PROTOCOL_ERROR, "invalid frame type #{format('0x%02x', typ)} for stream identifier #{format('0x%02x', stream_id)}")] \
    if [FrameType::SETTINGS, FrameType::PING, FrameType::GOAWAY].include?(typ)

  max_streams = @peer_settings[SettingsID::SETTINGS_MAX_CONCURRENT_STREAMS]
  send_initial_window_size = @peer_settings[SettingsID::SETTINGS_INITIAL_WINDOW_SIZE]
  recv_initial_window_size = @settings[SettingsID::SETTINGS_INITIAL_WINDOW_SIZE]
  obj = self.class.transition_stream_state_recv(frame, @streams_ctx, @data_buffer, stream_id, max_streams, send_initial_window_size, recv_initial_window_size)
  return [obj] if Biryani.err?(obj)

  ctx = obj
  case typ
  when FrameType::DATA
    obj = self.class.handle_data(stream_id, frame.data, @recv_window, @streams_ctx, @decoder)
    return [obj] if Biryani.err?(obj)

    obj # return WINDOW_UPDATE Frames
  when FrameType::HEADERS, FrameType::CONTINUATION
    err = self.class.handle_headers(frame, ctx, @decoder)
    return [err] unless err.nil?

    []
  when FrameType::PRIORITY
    # ignore PRIORITY Frame

    []
  when FrameType::PUSH_PROMISE
    # TODO

    []
  when FrameType::RST_STREAM
    self.class.handle_rst_stream(frame, ctx)

    []
  when FrameType::WINDOW_UPDATE
    err = self.class.handle_stream_window_update(frame, @streams_ctx)
    return [err] unless err.nil?

    max_frame_size = @peer_settings[SettingsID::SETTINGS_MAX_FRAME_SIZE]
    @data_buffer.take!(@send_window, @streams_ctx, max_frame_size)
  else
    # ignore UNKNOWN Frame

    []
  end
end

#recv_dispatch(io, obj) ⇒ Array<Object>, ...

rubocop: disable Metrics/AbcSize rubocop: disable Metrics/CyclomaticComplexity rubocop: disable Metrics/PerceivedComplexity

Parameters:

  • io (IO)
  • obj (Object)

Returns:



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/biryani/connection.rb', line 90

def recv_dispatch(io, obj)
  if Biryani.err?(obj)
    reply_frame = Biryani.unwrap(obj, @streams_ctx.last_stream_id)
    self.class.do_send(io, reply_frame, true)
    close if self.class.transition_stream_state_send(reply_frame, @streams_ctx)
  elsif obj.length > @settings[SettingsID::SETTINGS_MAX_FRAME_SIZE]
    self.class.do_send(io, Frame::Goaway.new(@streams_ctx.last_stream_id, ErrorCode::FRAME_SIZE_ERROR, 'payload length greater than SETTINGS_MAX_FRAME_SIZE'), true)
    close
  else
    do_recv_dispatch(obj).each do |frame|
      reply_frame = Biryani.unwrap(frame, @streams_ctx.last_stream_id)
      self.class.do_send(io, reply_frame, true)
      if reply_frame.f_type == FrameType::WINDOW_UPDATE && reply_frame.stream_id.zero?
        @recv_window.increase!(reply_frame.window_size_increment)
      elsif reply_frame.f_type == FrameType::WINDOW_UPDATE
        @streams_ctx[reply_frame.stream_id].recv_window.increase!(reply_frame.window_size_increment)
      end

      close if self.class.transition_stream_state_send(reply_frame, @streams_ctx)
    end
  end
end

#recv_loop(io) ⇒ Object

Parameters:

  • io (IO)


55
56
57
58
59
60
61
62
63
64
# File 'lib/biryani/connection.rb', line 55

def recv_loop(io)
  Ractor.new(io, @sock = Ractor::Port.new) do |io_, sock_|
    loop do
      obj = Frame.read(io_)
      break if obj.nil?

      sock_.send(obj, move: true)
    end
  end
end

#select_loop(io) ⇒ Object

Parameters:

  • io (IO)


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/biryani/connection.rb', line 67

def select_loop(io)
  loop do
    break if @sock.closed? && @streams_ctx.empty?

    port, obj = Ractor.select(@sock, @streams_ctx.tx)
    if port == @sock
      recv_dispatch(io, obj)
    else
      res, stream_id = obj
      handle_response(io, res, stream_id)
    end

    break if closed?
  end
end

#serve(io) ⇒ Object

Parameters:

  • io (IO)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/biryani/connection.rb', line 36

def serve(io)
  err = self.class.read_http2_magic(io)
  unless err.nil?
    self.class.do_send(io, err.goaway(@streams_ctx.last_stream_id), true)
    return
  end

  self.class.do_send(io, Frame::Settings.new(false, 0, {}), true)

  recv_loop(io.clone)
  select_loop(io)
rescue StandardError => e
  puts e.backtrace
  self.class.do_send(io, Frame::Goaway.new(@streams_ctx.last_stream_id, ErrorCode::INTERNAL_ERROR, 'internal error'), true)
ensure
  io.close_write
end