Class: Hyperion::Http2Handler::RequestStream

Inherits:
Protocol::HTTP2::Stream
  • Object
show all
Defined in:
lib/hyperion/http2_handler.rb

Constant Summary collapse

VALID_REQUEST_PSEUDO_HEADERS =

RFC 7540 §8.1.2.1 — the only pseudo-headers a server MUST accept on a request. Anything else (notably ‘:status`, which is response-only, or an unknown `:foo`) is a malformed request that we reject with PROTOCOL_ERROR.

%w[:method :path :scheme :authority].freeze
FORBIDDEN_HEADERS =

RFC 7540 §8.1.2.2 — these connection-specific headers MUST NOT appear in HTTP/2 requests; their semantics are folded into HTTP/2 framing.

%w[connection transfer-encoding keep-alive upgrade proxy-connection].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRequestStream

Returns a new instance of RequestStream.



446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/hyperion/http2_handler.rb', line 446

def initialize(*)
  super
  @request_headers = []
  # 2.12-F — gRPC carries opaque protobuf bytes
  # ([1-byte compressed flag][4-byte length-prefix][message bytes]) in the
  # request body. The default UTF-8 encoding on a `+''` literal would
  # break valid_encoding? on byte sequences that don't form UTF-8
  # codepoints, leading to a Rack app reading `body.string` and getting
  # a String that misreports its bytesize / corrupts when string-
  # interpolated. ASCII_8BIT (binary) preserves bytes verbatim and is
  # the encoding gRPC Ruby clients expect. Same change is applied to
  # the HTTP/1.1 path as a separate concern; see Connection.
  @request_body = String.new(encoding: Encoding::ASCII_8BIT)
  @request_body_bytes = 0
  @request_complete = false
  @window_available = ::Async::Notification.new
  @protocol_error_reason = nil
  @declared_content_length = nil
  # 2.13-D — gRPC streaming RPCs. When the request HEADERS block carries
  # `content-type: application/grpc*` AND `te: trailers`, we promote the
  # stream into "streaming-input mode": DATA frames are pushed into a
  # `StreamingInput` queue (vs. accumulated into `@request_body`) and
  # the dispatch loop fires the app on HEADERS arrival (vs. END_STREAM),
  # so client-streaming + bidirectional RPCs work. Plain HTTP/2 traffic
  # keeps the pre-2.13-D buffered semantic.
  @streaming_input = nil
  @streaming_dispatch_ready = false
end

Instance Attribute Details

#protocol_error_reasonObject (readonly)

Returns the value of attribute protocol_error_reason.



443
444
445
# File 'lib/hyperion/http2_handler.rb', line 443

def protocol_error_reason
  @protocol_error_reason
end

#request_bodyObject (readonly)

Returns the value of attribute request_body.



443
444
445
# File 'lib/hyperion/http2_handler.rb', line 443

def request_body
  @request_body
end

#request_completeObject (readonly)

Returns the value of attribute request_complete.



443
444
445
# File 'lib/hyperion/http2_handler.rb', line 443

def request_complete
  @request_complete
end

#request_headersObject (readonly)

Returns the value of attribute request_headers.



443
444
445
# File 'lib/hyperion/http2_handler.rb', line 443

def request_headers
  @request_headers
end

#streaming_inputObject (readonly)

Returns the value of attribute streaming_input.



443
444
445
# File 'lib/hyperion/http2_handler.rb', line 443

def streaming_input
  @streaming_input
end

Instance Method Details

#dispatchable?Boolean

2.13-D — generic predicate used by the serve loop instead of ‘request_complete`. True for both code paths.

Returns:

  • (Boolean)


549
550
551
# File 'lib/hyperion/http2_handler.rb', line 549

def dispatchable?
  @request_complete || @streaming_dispatch_ready
end

#maybe_promote_streaming_input!Object

2.13-D — promote the stream into streaming-input mode when the request HEADERS look like a gRPC RPC. Detection rules (intentionally narrow so we don’t accidentally streaming-promote plain HTTP/2):

* `content-type` starts with `application/grpc` (covers `+proto`,
  `+json`, etc.). gRPC's MIME-type registry guarantees this prefix.
* `te: trailers` is present (RFC 7230 §4 — gRPC requires it on every
  request). HTTP/2 §8.1.2.2 already constrains `te` to `trailers`,
  so any request that carries `te` at all hit our validator first.

When promoted, we allocate the ‘StreamingInput` queue and arm `@streaming_dispatch_ready` so the serve loop can dispatch the app immediately (instead of waiting for END_STREAM).

The ‘:method` check (POST only) is defensive: gRPC RPCs are POST. GET-shaped requests with these headers are almost certainly a bug in the caller, not an intentional gRPC streaming request — fall through to the buffered path and let the app sort it out.



658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
# File 'lib/hyperion/http2_handler.rb', line 658

def maybe_promote_streaming_input!
  method = pseudo_value(':method')
  return unless method == 'POST'

  ct = nil
  te = nil
  @request_headers.each do |pair|
    name = pair[0].to_s
    ct ||= pair[1].to_s if name == 'content-type'
    te ||= pair[1].to_s if name == 'te'
  end
  return unless ct && ct.start_with?('application/grpc')
  return unless te && te.downcase.strip == 'trailers'

  @streaming_input = StreamingInput.new
  @streaming_dispatch_ready = true
end

#process_data(frame) ⇒ Object



511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
# File 'lib/hyperion/http2_handler.rb', line 511

def process_data(frame)
  data = super
  # rubocop:disable Rails/Present
  if data && !data.empty?
    if @streaming_input
      # 2.13-D — gRPC streaming-input: push DATA-frame bytes into the
      # queue Rack apps read from via `env['rack.input']`. Tracking
      # `@request_body_bytes` for content-length cross-check still
      # applies (a streaming gRPC request that advertises content-
      # length would still be wire-validated), but we deliberately
      # SKIP `@request_body << data` — the streaming path doesn't
      # buffer bytes a second time.
      @streaming_input.push(data)
    else
      @request_body << data
    end
    @request_body_bytes += data.bytesize
  end
  # rubocop:enable Rails/Present
  if frame.end_stream?
    validate_body_length! unless protocol_error?
    @request_complete = true
    @streaming_input&.close_writer
  end
  data
end

#process_headers(frame) ⇒ Object



482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
# File 'lib/hyperion/http2_handler.rb', line 482

def process_headers(frame)
  decoded = super
  # First HEADERS frame on a stream carries the request header block;
  # any later HEADERS frame is trailers (§8.1) and we deliberately do
  # not re-validate (re-running the validator would see the original
  # request pseudo-headers plus the new trailer block and falsely flag
  # them as misordered).
  first_block = @request_headers.empty?
  # decoded is an Array of [name, value] pairs (HPACK output).
  decoded.each { |pair| @request_headers << pair }
  # Run RFC 7540 §8.1.2 validation as soon as we have a complete header
  # block. We do it here (not at end_stream) so the dispatcher sees the
  # error flag before it spawns a fiber for the request.
  if first_block && !protocol_error?
    validate_request_headers!
    # 2.13-D — promote to streaming-input mode for gRPC requests so
    # client-streaming + bidi RPCs see DATA frames as they arrive.
    maybe_promote_streaming_input! unless protocol_error?
  end
  if frame.end_stream?
    validate_body_length! unless protocol_error?
    @request_complete = true
    # 2.13-D — closing the writer side wakes any reader fiber the
    # app has parked on `rack.input.read`.
    @streaming_input&.close_writer
  end
  decoded
end

#protocol_error?Boolean

Used by the dispatch loop to decide whether to invoke the app or send RST_STREAM PROTOCOL_ERROR. Set by ‘validate_request_headers!` and `validate_body_length!`.

Returns:

  • (Boolean)


478
479
480
# File 'lib/hyperion/http2_handler.rb', line 478

def protocol_error?
  !@protocol_error_reason.nil?
end

#streaming_dispatch_ready?Boolean

2.13-D — was this stream marked dispatchable on HEADERS arrival? The serve-loop dispatches both ‘request_complete` streams (unary path: app fires after END_STREAM) AND `streaming_dispatch_ready` streams (gRPC streaming path: app fires after HEADERS so it can read DATA frames as they land).

Returns:

  • (Boolean)


543
544
545
# File 'lib/hyperion/http2_handler.rb', line 543

def streaming_dispatch_ready?
  @streaming_dispatch_ready
end

#validate_body_length!Object

RFC 7540 §8.1.2.6 — if ‘content-length` was advertised, the actual number of DATA bytes received (across all DATA frames) MUST match.



615
616
617
618
619
620
621
622
# File 'lib/hyperion/http2_handler.rb', line 615

def validate_body_length!
  return if @declared_content_length.nil?
  return if @declared_content_length == @request_body_bytes

  fail_validation!(
    "content-length mismatch: declared #{@declared_content_length}, received #{@request_body_bytes}"
  )
end

#validate_request_headers!Object

RFC 7540 §8.1.2 — request header validation. Sets ‘@protocol_error_reason` on the first violation we hit; the dispatch loop turns that into RST_STREAM PROTOCOL_ERROR.



556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
# File 'lib/hyperion/http2_handler.rb', line 556

def validate_request_headers!
  seen_regular = false
  pseudo_counts = Hash.new(0)
  @request_headers.each do |pair|
    name, value = pair
    name = name.to_s
    if name.start_with?(':')
      # §8.1.2.1: pseudo-headers MUST precede regular headers.
      return fail_validation!('pseudo-header after regular header') if seen_regular
      # §8.1.2.1: only the four request pseudo-headers are valid; in
      # particular, `:status` is response-only.
      unless VALID_REQUEST_PSEUDO_HEADERS.include?(name)
        return fail_validation!("invalid request pseudo-header: #{name}")
      end

      pseudo_counts[name] += 1
    else
      seen_regular = true
      # §8.1.2: header names must be lowercase in HTTP/2.
      return fail_validation!('uppercase header name') if /[A-Z]/.match?(name)
      # §8.1.2.2: connection-specific headers are forbidden.
      return fail_validation!("forbidden connection-specific header: #{name}") if FORBIDDEN_HEADERS.include?(name)
      # §8.1.2.2: TE may only carry the value `trailers`.
      if name == 'te' && value.to_s.downcase.strip != 'trailers'
        return fail_validation!('TE header with non-trailers value')
      end

      # Track declared content-length for later body-byte cross-check.
      @declared_content_length = value.to_s.to_i if name == 'content-length'
    end
  end

  # §8.1.2.3: every pseudo-header may appear at most once.
  pseudo_counts.each do |name, count|
    return fail_validation!("duplicated pseudo-header: #{name}") if count > 1
  end

  method = pseudo_value(':method')
  # CONNECT (§8.3) has its own rules; everything else MUST carry
  # :method, :scheme and a non-empty :path.
  if method == 'CONNECT'
    return fail_validation!('CONNECT with :scheme') if pseudo_value(':scheme')
    return fail_validation!('CONNECT with :path') if pseudo_value(':path')
    return fail_validation!('CONNECT without :authority') unless pseudo_value(':authority')
  else
    return fail_validation!('missing :method') if method.nil? || method.empty?

    scheme = pseudo_value(':scheme')
    return fail_validation!('missing :scheme') if scheme.nil? || scheme.empty?

    path = pseudo_value(':path')
    return fail_validation!('missing or empty :path') if path.nil? || path.empty?
  end

  nil
end

#wait_for_windowObject

Block the calling fiber until the remote window grows. Cheap no-op signal each time ‘window_updated` fires; the caller re-checks available_frame_size in a loop.



636
637
638
# File 'lib/hyperion/http2_handler.rb', line 636

def wait_for_window
  @window_available.wait
end

#window_updated(size) ⇒ Object

Called by protocol-http2 whenever the remote peer’s flow-control window opens up — either via a stream-level WINDOW_UPDATE or via the connection-level fan-out in ‘Connection#consume_window`. We poke the notification so any fiber waiting in `wait_for_window` resumes.



628
629
630
631
# File 'lib/hyperion/http2_handler.rb', line 628

def window_updated(size)
  @window_available.signal
  super
end