Class: Raptor::Http2

Inherits:
Object
  • Object
show all
Defined in:
lib/raptor/http2.rb

Overview

Handles HTTP/2 request processing and Rack application integration.

Http2 manages the HTTP/2 protocol lifecycle including frame processing, HPACK header compression, stream management, and response writing. It integrates with the same reactor, ractor pool, and thread pool pipeline used by HTTP/1.1 connections.

Defined Under Namespace

Classes: FlowControl, Writer

Constant Summary collapse

FLAG_END_STREAM =
0x1
FLAG_END_HEADERS =
0x4
FLAG_ACK =
0x1
FLAG_PRIORITY =
0x20
ERROR_NO_ERROR =
0x0
ERROR_PROTOCOL_ERROR =
0x1
DEFAULT_WINDOW_SIZE =
65_535
MAX_FRAME_SIZE =
16_384
SERVER_PROTOCOL =
"HTTP/2"
RACK_HEADER_PREFIX =
"rack."
HOP_BY_HOP_HEADERS =
Set.new(%w[connection transfer-encoding keep-alive upgrade proxy-connection]).freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, server_port, on_error: nil) ⇒ void

Creates a new Http2 handler.

Parameters:

  • app (#call)

    the Rack application to dispatch requests to

  • server_port (Integer)

    port number used to populate SERVER_PORT in the Rack env

  • on_error (#call, nil) (defaults to: nil)

    callback invoked with (env, exception) when the Rack app raises



252
253
254
255
256
# File 'lib/raptor/http2.rb', line 252

def initialize(app, server_port, on_error: nil)
  @app = app
  @server_port = server_port
  @on_error = on_error
end

Class Method Details

.build_server_settings_frameString

Builds the initial server SETTINGS frame to send on connection establishment.

Returns:

  • (String)

    the encoded SETTINGS frame



263
264
265
266
267
268
269
270
# File 'lib/raptor/http2.rb', line 263

def self.build_server_settings_frame
  parser = Http2Parser.new
  settings_payload = parser.build_settings(
    max_concurrent_streams: 100,
    initial_window_size: DEFAULT_WINDOW_SIZE
  )
  parser.build_frame(:settings, 0, 0, settings_payload)
end

.process_frames(data) ⇒ Hash

Processes HTTP/2 frames from the connection buffer.

Parses frames, handles HPACK decoding, tracks stream state, and returns updated connection state along with any outgoing protocol frames and completed stream requests. Ractor-safe.

Parameters:

  • data (Hash)

    the connection state including buffer and HPACK table

Returns:

  • (Hash)

    updated state with outgoing_frames and completed_requests



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/raptor/http2.rb', line 282

def self.process_frames(data)
  parser = Http2Parser.new
  buffer = data[:buffer]
  hpack_table = data[:hpack_table] || []
  streams = data[:http2_streams] ? data[:http2_streams].dup : {}
  outgoing_frames = []
  completed_requests = []
  window_updates = []
  peer_initial_window_size = nil
  connection_window = data[:http2_window] || DEFAULT_WINDOW_SIZE
  preface_received = data[:http2_preface_received] || false
  last_client_stream_id = data[:http2_last_client_stream_id] || 0
  pending_headers = data[:http2_pending_headers]
  goaway_error = nil

  unless preface_received
    if buffer.bytesize >= 24 && buffer.byteslice(0, 24) == Http2Parser.connection_preface
      buffer = buffer.byteslice(24..-1) || ""
      preface_received = true
    else
      return build_result(data, buffer, hpack_table, streams, outgoing_frames, completed_requests, window_updates, peer_initial_window_size, connection_window, preface_received, last_client_stream_id, pending_headers, false)
    end
  end

  loop do
    parsed = parser.parse_frame(buffer)
    break unless parsed

    frame, consumed = parsed
    buffer = buffer.byteslice(consumed..-1) || ""

    if pending_headers && frame[:type] != :continuation
      goaway_error = ERROR_PROTOCOL_ERROR
      break
    end

    case frame[:type]
    when :settings
      if (frame[:flags] & FLAG_ACK).zero?
        parsed_settings = parser.parse_settings(frame[:payload])
        peer_initial_window_size = parsed_settings[:initial_window_size] if parsed_settings.key?(:initial_window_size)
        outgoing_frames << parser.build_frame(:settings, FLAG_ACK, 0, nil)
      end

    when :headers
      stream_id = frame[:stream_id]
      header_payload = frame[:payload]

      unless streams.key?(stream_id)
        if stream_id.even? || stream_id <= last_client_stream_id
          goaway_error = ERROR_PROTOCOL_ERROR
          break
        end
        last_client_stream_id = stream_id
      end

      if (frame[:flags] & FLAG_PRIORITY) != 0
        header_payload = header_payload.byteslice(5..-1) || ""
      end

      end_stream = (frame[:flags] & FLAG_END_STREAM) != 0

      if (frame[:flags] & FLAG_END_HEADERS) != 0
        decoded_headers, hpack_table = parser.parse_headers(header_payload, hpack_table)
        streams, completed_requests = finalize_headers(streams, completed_requests, stream_id, decoded_headers, end_stream)
      else
        pending_headers = { stream_id: stream_id, buffer: header_payload, end_stream: end_stream }
      end

    when :continuation
      if pending_headers.nil? || frame[:stream_id] != pending_headers[:stream_id]
        goaway_error = ERROR_PROTOCOL_ERROR
        break
      end

      pending_headers = pending_headers.merge(buffer: pending_headers[:buffer] + frame[:payload])

      if (frame[:flags] & FLAG_END_HEADERS) != 0
        stream_id = pending_headers[:stream_id]
        decoded_headers, hpack_table = parser.parse_headers(pending_headers[:buffer], hpack_table)
        streams, completed_requests = finalize_headers(streams, completed_requests, stream_id, decoded_headers, pending_headers[:end_stream])
        pending_headers = nil
      end

    when :data
      stream_id = frame[:stream_id]

      unless streams.key?(stream_id)
        goaway_error = ERROR_PROTOCOL_ERROR
        break
      end

      stream = streams[stream_id]
      existing_body = stream[:body] || ""
      stream = stream.merge(body: existing_body + frame[:payload])

      if frame[:payload].bytesize.positive?
        connection_window -= frame[:payload].bytesize
        if connection_window < DEFAULT_WINDOW_SIZE / 2
          increment = DEFAULT_WINDOW_SIZE - connection_window
          wu_payload = [increment].pack("N")
          outgoing_frames << parser.build_frame(:window_update, 0, 0, wu_payload)
          outgoing_frames << parser.build_frame(:window_update, 0, stream_id, wu_payload)
          connection_window += increment
        end
      end

      if (frame[:flags] & FLAG_END_STREAM) != 0
        stream_headers = stream[:headers] || []
        completed_requests << {
          stream_id: stream_id,
          headers: stream_headers,
          body: stream[:body]
        }

        streams.delete(stream_id)
      else
        streams[stream_id] = stream
      end

    when :window_update
      increment = parser.parse_window_update(frame[:payload])
      window_updates << [frame[:stream_id], increment]

    when :ping
      if (frame[:flags] & FLAG_ACK).zero?
        outgoing_frames << parser.build_frame(:ping, FLAG_ACK, 0, frame[:payload])
      end

    when :goaway
      break

    when :rst_stream
      streams.delete(frame[:stream_id])
    end
  end

  if goaway_error
    goaway_payload = [last_client_stream_id, goaway_error].pack("NN")
    outgoing_frames << parser.build_frame(:goaway, 0, 0, goaway_payload)
  end

  build_result(data, buffer, hpack_table, streams, outgoing_frames, completed_requests, window_updates, peer_initial_window_size, connection_window, preface_received, last_client_stream_id, pending_headers, !goaway_error.nil?)
end

Instance Method Details

#handle_parsed_request(result, reactor, thread_pool) ⇒ void

This method returns an undefined value.

Handles a parsed HTTP/2 request from the ractor pool.

Writes outgoing protocol frames to the socket, updates reactor state, and dispatches completed stream requests to the thread pool.

Parameters:

  • result (Hash)

    the parsed result from the ractor pool

  • reactor (Reactor)

    the reactor managing the connection

  • thread_pool (AtomicThreadPool)

    thread pool for Rack app dispatch



510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# File 'lib/raptor/http2.rb', line 510

def handle_parsed_request(result, reactor, thread_pool)
  socket = reactor.socket_for(result[:id])
  return unless socket

  writer = reactor.writer_for(result[:id])
  flow_control = reactor.flow_control_for(result[:id])

  if flow_control && (result[:window_updates] || result[:peer_initial_window_size])
    apply_flow_control_updates(flow_control, result)
  end

  writer.write_frames(socket, result[:outgoing_frames])

  if result[:close_connection]
    reactor.close_connection(result[:id])
    return
  end

  reactor.update_http2_state(result)

  result[:completed_requests]&.each do |request|
    stream_id = request[:stream_id]
    remote_addr = result[:remote_addr] || "127.0.0.1"

    thread_pool << proc do
      dispatch_stream_request(
        socket, writer, flow_control, stream_id,
        request[:headers], request[:body],
        remote_addr: remote_addr
      )
    end
  end
end