Class: Hyperion::Http2Handler

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/http2_handler.rb

Overview

Real HTTP/2 dispatch driven by ‘protocol-http2`.

Each TLS connection that negotiated ‘h2` via ALPN ends up here. We frame the socket, read the connection preface, and then drive a frame loop on the connection’s fiber: it reads one frame at a time and lets ‘protocol-http2` update its connection/stream state machines. As soon as a client stream finishes its request half (state `:half_closed_remote` via `end_stream?`), we hand the stream off to a sibling fiber for dispatch — slow handlers no longer block other streams on the same connection.

## Outbound write architecture (1.6.0+)

Pre-1.6.0 every framer write (HEADERS / DATA / RST_STREAM / GOAWAY) ran under one connection-scoped ‘Mutex#synchronize { socket.write(…) }`. That capped per-connection h2 throughput to “one socket-write at a time” regardless of stream count: a slow socket (kernel send buffer full, remote peer reading slowly) blocked every other stream’s writes too.

1.6.0 splits the path:

* The HPACK encode + frame format step is fast (microseconds, in-memory)
  and remains serialized on the calling fiber via `@encode_mutex`. HPACK
  state is stateful across HEADERS frames per connection, and frames for
  a single stream must be wire-ordered (HEADERS → DATA → END_STREAM).
  Holding the encode mutex across a `send_*` call accomplishes both.
* The framer writes through a `SendQueueIO` wrapper (wraps the real
  socket). `SendQueueIO#write(bytes)` enqueues onto a connection-wide
  `@send_queue` and signals `@send_notify`; it never touches the real
  socket.
* A dedicated **writer fiber** owns the real socket. It pops byte chunks
  off the queue, writes them, and parks on `@send_notify` when empty.
  Only this fiber ever calls `socket.write` — the SSLSocket cross-fiber
  unsafety constraint is satisfied.

Net effect: the slow-socket case no longer serializes encode work across streams. A stream that has bytes ready to encode can encode and enqueue while the writer is mid-flush of an earlier chunk. The mutex hold time drops from “until the kernel accepts the write” to “until the bytes are appended to the in-memory queue.”

Backpressure: pathological clients (slow-read h2) could otherwise let the queue grow without bound. We track ‘@pending_bytes`; once it exceeds `MAX_PER_CONN_PENDING_BYTES`, encoding fibers wait on `@drained_notify` before enqueueing more. The writer signals `@drained_notify` after each drain pass.

Flow control: ‘RequestStream#window_updated` overrides the protocol-http2 default to fan a notification out to any fiber blocked in `send_body` waiting for the remote peer’s flow-control window to grow. The body writer chunks the response payload by the per-stream available frame size and yields on the notification when the window is exhausted, so large bodies never trip a FlowControlError.

Defined Under Namespace

Classes: RequestStream, SendQueueIO, StreamingInput, WriterContext

Constant Summary collapse

MAX_PER_CONN_PENDING_BYTES =

Cap on bytes that may sit in a connection’s send queue waiting for the writer fiber to drain. Slow-read h2 clients can otherwise let an encoder fiber pile arbitrary bytes into RAM. 16 MiB matches the upper bound a well-behaved peer will buffer — anything beyond that is the writer being starved, and the right answer is to backpressure the encoder rather than allocate more.

16 * 1024 * 1024
SETTINGS_KEY_MAP =

Maps Hyperion-friendly setting names to the integer SETTINGS_* identifiers protocol-http2 uses on the wire. See RFC 7540 §6.5.2 — these are the only four parameters Hyperion exposes; the rest of the SETTINGS frame (HEADER_TABLE_SIZE, ENABLE_PUSH, etc.) keeps protocol-http2’s default.

{
  max_concurrent_streams: ::Protocol::HTTP2::Settings::MAXIMUM_CONCURRENT_STREAMS,
  initial_window_size: ::Protocol::HTTP2::Settings::INITIAL_WINDOW_SIZE,
  max_frame_size: ::Protocol::HTTP2::Settings::MAXIMUM_FRAME_SIZE,
  max_header_list_size: ::Protocol::HTTP2::Settings::MAXIMUM_HEADER_LIST_SIZE
}.freeze
H2_MIN_FRAME_SIZE =

RFC 7540 §6.5.2 floor for SETTINGS_MAX_FRAME_SIZE. protocol-http2 raises ProtocolError on values below this; we clamp + warn instead so a misconfigured operator gets a working server, not a boot-time crash.

0x4000
H2_MAX_FRAME_SIZE =

RFC 7540 §6.5.2 ceiling for SETTINGS_MAX_FRAME_SIZE.

0xFFFFFF
H2_MAX_WINDOW_SIZE =

RFC 7540 §6.9.2 — INITIAL_WINDOW_SIZE has the same 31-bit max as the WINDOW_UPDATE frame’s Window Size Increment (see protocol-http2’s MAXIMUM_ALLOWED_WINDOW_SIZE).

0x7FFFFFFF
DISPATCH_POOL_DEFAULT =

2.11-A — pre-spawned dispatch worker pool sizing.

Default ‘4` workers per connection — enough to absorb the typical HTTP/2 burst (2-8 concurrent streams) without paying any per-stream `task.async {}` cost on the hot path. Operators on long-lived high-fan-out connections (e.g. an aggregator backend that fans 30+ parallel streams) can bump this with `HYPERION_H2_DISPATCH_POOL`. Streams that arrive when the pool is saturated still get an ad-hoc fiber (see `serve` below) so concurrency is never artificially capped — the operator-facing limit is `h2.max_concurrent_streams`.

Ceiling at 16 guards against a pathological config that would spawn hundreds of idle fibers per accepted connection. Anything malformed / non-positive falls back to the default rather than crashing the connection — this is a tuning knob, not a spec parameter.

4
DISPATCH_POOL_MAX =
16

Instance Method Summary collapse

Constructor Details

#initialize(app:, thread_pool: nil, h2_settings: nil, runtime: nil, h2_admission: nil) ⇒ Http2Handler

1.7.0 added kwargs:

* `runtime:`      — `Hyperion::Runtime` for metrics/logger
                    isolation (default `Runtime.default`).
* `h2_admission:` — Optional `Hyperion::H2Admission` for the
                    per-process stream cap (RFC A7). nil keeps
                    the 1.6.x unbounded behaviour.

2.0.0 (Phase 6b) probed ‘Hyperion::H2Codec.available?` at construction so the handler knew whether the native HPACK path was operational, but the connection state machine still drove encode/decode through `protocol-http2`’s pure-Ruby Compressor / Decompressor.

2.2.0 (Phase 10 / RFC §3 Phase 6c) ships the wiring infrastructure: Hyperion::Http2::NativeHpackAdapter + #install_native_hpack replace the per-connection HPACK encode/decode boundary with the Rust crate when AND ONLY WHEN both:

1. `Hyperion::H2Codec.available?` is true (cdylib loaded), AND
2. `ENV['HYPERION_H2_NATIVE_HPACK']` is one of `1`/`true`/`yes`/`on`.

The default is OFF because local h2load benchmarking on macOS showed the Fiddle FFI per-call marshalling overhead dominates for typical 3–8-header HEADERS frames — the standalone microbench’s 3.26× encode win does not translate to wire wins until the FFI marshalling layer is rewritten to amortize allocation. Keeping the default OFF preserves 2.0.0/2.1.0 behavior; flipping the env var gives operators the swap they want to A/B test in their own env. The framer + stream state machine + flow control + HEADERS / CONTINUATION framing all stay in ‘protocol-http2`; only the HPACK byte-pump is replaced when the swap is enabled. Frame ser/de in Rust (Phase 6d) is a separate, larger lift.



754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
# File 'lib/hyperion/http2_handler.rb', line 754

def initialize(app:, thread_pool: nil, h2_settings: nil, runtime: nil, h2_admission: nil)
  @app          = app
  @thread_pool  = thread_pool
  @h2_settings  = h2_settings
  if runtime
    @runtime = runtime
    @metrics = runtime.metrics
    @logger  = runtime.logger
  else
    # 1.6.x compat path — see Connection#initialize for rationale.
    @runtime = Hyperion::Runtime.default
    @metrics = Hyperion.metrics
    @logger  = Hyperion.logger
  end
  @h2_admission       = h2_admission
  # 2.12-E — per-worker request counter label. Identical caching
  # rationale to Connection#initialize: process-constant ID, looked
  # up once and held in the ivar.
  @worker_id          = Process.pid.to_s
  @h2_codec_available = Hyperion::H2Codec.available?
  # 2.5-B [breaking-default-change]: native HPACK now defaults to ON
  # when the Rust crate is available. The 2026-04-30 Rails-shape
  # bench (`bench/h2_rails_shape.ru`, 25 response headers) measured
  # native v3 at 1,418 r/s vs Ruby fallback 1,201 r/s — **+18.0%**
  # on a header-heavy workload, comfortably above the +15% flip
  # threshold. 2.4-A's hello-shape bench saw parity because HPACK
  # is <1% of per-stream CPU on a 2-header response.
  #
  # 2.11-B — `HYPERION_H2_NATIVE_HPACK` extended with a native-mode
  # axis (`auto` / `cglue` / `v2` / `off`). See `resolve_h2_native_hpack_state`.
  # Operators who want the prior 2.4.x default (Ruby fallback, env
  # var unset) can set `HYPERION_H2_NATIVE_HPACK=off` (or
  # `0`/`false`/`no`/`off`/`ruby`). `HYPERION_H2_NATIVE_HPACK=1`
  # / unset preserves the 2.5-B `auto` behavior. `=cglue`/`=v2`
  # forces the corresponding native sub-path.
  #
  # When OFF (env-overridden): `protocol-http2`'s pure-Ruby HPACK
  # Compressor / Decompressor handles everything as in 2.0.0–2.4.x.
  @h2_native_mode          = resolve_h2_native_hpack_state
  @h2_native_hpack_enabled = @h2_codec_available && @h2_native_mode != :off
  apply_h2_cglue_gate(@h2_native_mode)
  @h2_codec_native = @h2_native_hpack_enabled # back-compat ivar — preserved for codec_native? readers
  # 2.10-G — opt-in connection-setup timing instrumentation. When set,
  # `serve` captures four monotonic timestamps per connection:
  #
  #   t0 — entry to `serve` (post-TLS, post-ALPN — the socket is already
  #        the negotiated h2 SSLSocket by the time the handler sees it)
  #   t1 — `read_connection_preface` returned (server-side SETTINGS
  #        encoded + handed to the framer; client preface fully read)
  #   t2_encode — first stream's HEADERS frame finished encoding (bytes
  #               sit in the writer queue)
  #   t2_wire   — writer fiber finished its first `socket.write` (bytes
  #               on the wire)
  #
  # When the connection's first response completes, the handler emits
  # a single `'h2 first-stream timing'` info line with t0→t1, t1→t2_encode,
  # t2_encode→t2_wire deltas in milliseconds. Off by default (zero hot-path
  # cost when disabled — a single ivar read per stream branch). Used by
  # 2.10-G to root-cause Hyperion's flat ~40 ms first-stream max-latency.
  @h2_timing_enabled = env_flag_enabled?('HYPERION_H2_TIMING')
  # 2.11-A — resolve the dispatch worker pool size once at handler
  # construction so every `serve` call uses the same value (instead
  # of re-parsing ENV per connection on the hot path). Cached as an
  # ivar; bench/diagnostics can read it via the spec seam.
  @dispatch_pool_size = resolve_dispatch_pool_size
  record_codec_boot_state
end

Instance Method Details

#apply_h2_cglue_gate(state) ⇒ Object

2.11-B — flip the global ‘H2Codec.cglue_disabled` gate based on the resolved native-mode state. The gate is per-process state (the codec module is a singleton) so reset it on every handler construction; otherwise a test that booted with `=v2` would leak the disable into a subsequent default-mode handler.



896
897
898
# File 'lib/hyperion/http2_handler.rb', line 896

def apply_h2_cglue_gate(state)
  Hyperion::H2Codec.cglue_disabled = (state == :v2)
end

#codec_available?Boolean

True when the Rust crate loaded successfully, regardless of whether the operator opted in to wiring it into the wire path. Useful for diagnostics/health endpoints that want to surface “native is available but currently disabled”.

Returns:

  • (Boolean)


989
990
991
# File 'lib/hyperion/http2_handler.rb', line 989

def codec_available?
  @h2_codec_available
end

#codec_native?Boolean

Read-only accessor used by tests + diagnostics. true = the ‘Hyperion::H2Codec` Rust extension loaded successfully AND `HYPERION_H2_NATIVE_HPACK=1` is set, so `build_server` will wire the native adapter onto every new connection’s ‘encode_headers` / `decode_headers` boundary. The 2.2.0 default is false (opt-in) — see `#initialize` for the rationale and the bench numbers in CHANGELOG/docs that pinned the default off.

Returns:

  • (Boolean)


981
982
983
# File 'lib/hyperion/http2_handler.rb', line 981

def codec_native?
  @h2_native_hpack_enabled
end

#describe_codec_mode(cglue_active:, cglue_requested_unavailable:) ⇒ Object

2.11-B — boot-log mode descriptor (extracted for clarity since the matrix of native_mode × cglue_available × cglue_active grew past the point where an inline conditional was readable).



949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
# File 'lib/hyperion/http2_handler.rb', line 949

def describe_codec_mode(cglue_active:, cglue_requested_unavailable:)
  if !@h2_native_hpack_enabled
    if @h2_codec_available
      'fallback (protocol-http2 / pure Ruby HPACK) — native available but opted out via HYPERION_H2_NATIVE_HPACK=off'
    else
      'fallback (protocol-http2 / pure Ruby HPACK) — native unavailable'
    end
  elsif cglue_active && @h2_native_mode == :cglue
    'native (Rust v3 / CGlue, forced) — HPACK on hot path, no Fiddle per call'
  elsif cglue_active
    # 2.11-B confirmed cglue as the firm default — the bench-measured
    # delta vs the v2 (Fiddle) path is +33-43% on Rails-shape h2
    # responses, which is the actual win the 2.5-B "+18% native vs
    # ruby" headline was capturing (v2 alone is +1-5%, basically
    # noise vs the ruby fallback at this header count).
    'native (Rust v3 / CGlue, default since 2.11-B) — HPACK on hot path, no Fiddle per call'
  elsif @h2_native_mode == :v2
    'native (Rust v2 / Fiddle, forced) — HPACK on hot path, Fiddle marshalling per call'
  elsif cglue_requested_unavailable
    'native (Rust v2 / Fiddle) — CGlue requested via HYPERION_H2_NATIVE_HPACK=cglue but unavailable, fell back'
  else
    'native (Rust v2 / Fiddle) — HPACK on hot path, Fiddle marshalling per call'
  end
end

#env_flag_enabled?(name) ⇒ Boolean

Read an env-var flag with the usual truthiness rules (any of 1/true/yes/on, case-insensitive). Anything else → false.

Returns:

  • (Boolean)


855
856
857
858
859
860
# File 'lib/hyperion/http2_handler.rb', line 855

def env_flag_enabled?(name)
  v = ENV[name]
  return false if v.nil? || v.empty?

  %w[1 true yes on].include?(v.downcase)
end

#record_codec_boot_stateObject

2.0.0 Phase 6b: emit a single-shot boot log line per process describing the codec selection. Operators reading the boot log see whether the native HPACK path is in play. Idempotent across multiple Http2Handler constructions in the same process.



904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
# File 'lib/hyperion/http2_handler.rb', line 904

def record_codec_boot_state
  return if Hyperion::Http2Handler.instance_variable_get(:@codec_state_logged)

  Hyperion::Http2Handler.instance_variable_set(:@codec_state_logged, true)
  # 2.11-B — `cglue_active` gates on the operator-controllable
  # `cglue_active?` predicate (was `cglue_available?` pre-2.11-B).
  # When the operator sets `=v2` we want the boot log to read
  # `cglue_active: false` even though the C glue did install
  # successfully — the bench harness inspects this field to
  # differentiate the variants.
  cglue_active = @h2_native_hpack_enabled && Hyperion::H2Codec.cglue_active?
  cglue_requested_unavailable = @h2_native_mode == :cglue &&
                                @h2_native_hpack_enabled &&
                                !Hyperion::H2Codec.cglue_available?
  mode = describe_codec_mode(cglue_active: cglue_active,
                             cglue_requested_unavailable: cglue_requested_unavailable)
  native_mode_log = if !@h2_native_hpack_enabled
                      @h2_native_mode == :off ? 'off' : 'native-disabled'
                    elsif cglue_requested_unavailable
                      'cglue-requested-unavailable'
                    else
                      @h2_native_mode.to_s
                    end
  @logger.info do
    {
      message: 'h2 codec selected',
      mode: mode,
      native_available: @h2_codec_available,
      native_enabled: @h2_native_hpack_enabled,
      native_mode: native_mode_log,
      cglue_active: cglue_active,
      hpack_path: if @h2_native_hpack_enabled
                    cglue_active ? 'native-v3' : 'native-v2'
                  else
                    'pure-ruby'
                  end
    }
  end
  @metrics.increment(:h2_codec_native_selected) if @h2_native_hpack_enabled
  @metrics.increment(:h2_codec_fallback_selected) unless @h2_native_hpack_enabled
end

#resolve_dispatch_pool_sizeObject



841
842
843
844
845
846
847
848
849
850
851
# File 'lib/hyperion/http2_handler.rb', line 841

def resolve_dispatch_pool_size
  raw = ENV['HYPERION_H2_DISPATCH_POOL']
  return DISPATCH_POOL_DEFAULT if raw.nil? || raw.strip.empty?

  n = Integer(raw.strip, 10)
  return DISPATCH_POOL_DEFAULT unless n.positive?

  [n, DISPATCH_POOL_MAX].min
rescue ArgumentError, TypeError
  DISPATCH_POOL_DEFAULT
end

#resolve_h2_native_hpack_stateObject

2.11-B — resolve the operator-requested native-mode state from ‘HYPERION_H2_NATIVE_HPACK`.

Returns one of:

* `:auto`  — native enabled, prefer cglue if available
             (unset / `1` / `true` / `yes` / `on` / `auto`)
* `:cglue` — native enabled, force cglue (warn-fallback to v2
             if cglue is unavailable; native_mode log marker
             surfaces the divergence to the operator)
* `:v2`    — native enabled, force Fiddle (skip cglue even if
             available; this is the bench-isolation knob the
             2.11-B Rails-shape harness needs)
* `:off`   — ruby fallback (`0` / `false` / `no` / `off` / `ruby`)

Unknown values fall through to ‘:auto` rather than crashing the connection — same forgiving-default policy as the pre-2.11-B `resolve_h2_native_hpack_default`.



879
880
881
882
883
884
885
886
887
888
889
# File 'lib/hyperion/http2_handler.rb', line 879

def resolve_h2_native_hpack_state
  v = ENV['HYPERION_H2_NATIVE_HPACK']
  return :auto if v.nil? || v.empty?

  lc = v.downcase
  return :off   if %w[0 false no off ruby].include?(lc)
  return :cglue if %w[cglue v3].include?(lc)
  return :v2    if %w[v2 fiddle].include?(lc)

  :auto
end

#serve(socket) ⇒ Object



993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
# File 'lib/hyperion/http2_handler.rb', line 993

def serve(socket)
  @metrics.increment(:connections_accepted)
  @metrics.increment(:connections_active)

  # Per-connection outbound coordination. Encoder fibers enqueue bytes;
  # the writer fiber owns the real socket and drains. See class docstring.
  writer_ctx   = WriterContext.new
  send_io      = SendQueueIO.new(socket, writer_ctx)
  framer       = ::Protocol::HTTP2::Framer.new(send_io)
  server       = build_server(framer)

  # 2.10-G — connection entry timestamp. Captured before any framing
  # work so the t0→t1 delta isolates "preface exchange + initial
  # SETTINGS round-trip" from any pre-handler scheduling delay.
  writer_ctx.t0_serve_entry = monotonic_now if @h2_timing_enabled

  task = ::Async::Task.current

  # 2.11-A — extract the peer address BEFORE the preface exchange.
  # Two wins: (1) the lookup runs in parallel with the writer fiber
  # picking up the first scheduler slot, and (2) the first stream's
  # dispatch fiber doesn't pay this `peeraddr` syscall on its hot
  # path. The address is then captured by the worker closures
  # below.
  peer_addr = peer_address(socket)

  # Spawn the dedicated writer fiber BEFORE the preface exchange.
  # `Server#read_connection_preface` writes the server's SETTINGS frame
  # via the framer; if the writer isn't running, those bytes sit in the
  # queue. Spawning first guarantees they flush as soon as the scheduler
  # ticks, avoiding any pathological deadlock where a client implementation
  # waits for our SETTINGS before sending more frames.
  writer_task = task.async { run_writer_loop(socket, writer_ctx) }

  # 2.11-A — pre-spawn the dispatch worker pool BEFORE the preface
  # exchange. Workers park on `writer_ctx.dispatch_queue.dequeue`;
  # by the time the first client HEADERS frame arrives the workers
  # are already in the scheduler's runnable set. The first stream
  # is just an enqueue + dequeue (microseconds) instead of a
  # `task.async {}` cold spawn (was the dominant cost in the t1→t2_enc
  # bucket per the 2.10-G timing breakdown).
  warmup_dispatch_pool!(task, writer_ctx, peer_addr: peer_addr,
                                          pool_size: @dispatch_pool_size)

  server.read_connection_preface(initial_settings_payload)
  writer_ctx.t1_preface_done = monotonic_now if @h2_timing_enabled

  # Track ad-hoc per-stream dispatch fibers (spilled when the pool is
  # saturated). The pool handles the common case; we only fall back
  # to `task.async {}` when more streams arrive than warm workers.
  overflow_tasks = []

  until server.closed?
    ready_ids = []
    server.read_frame do |frame|
      ready_ids << frame.stream_id if frame.stream_id.positive?
    end

    ready_ids.uniq.each do |sid|
      stream = server.streams[sid]
      next unless stream.is_a?(RequestStream)
      # 2.13-D — `dispatchable?` covers both unary (request_complete on
      # END_STREAM) and gRPC streaming-input (dispatch_ready on first
      # HEADERS). Pre-2.13-D this was a `request_complete` check.
      next unless stream.dispatchable?
      next if stream.closed?
      next if stream.instance_variable_get(:@hyperion_dispatched)

      # Mark before spawning so we never dispatch the same stream twice
      # if subsequent frames (e.g. RST_STREAM races) arrive.
      stream.instance_variable_set(:@hyperion_dispatched, true)

      # 2.11-A — hand the stream to a warm worker via the dispatch
      # queue. We use a simple "queue is empty" probe to decide:
      #
      #   * Empty queue ⇒ at least one worker is parked on
      #     `dequeue`; the enqueue+dequeue handoff is microseconds
      #     and we avoid a `task.async {}` cold spawn. This is the
      #     hot path for the FIRST stream of a fresh connection
      #     (the case 2.11-A is targeting).
      #   * Non-empty queue ⇒ every parked worker has already
      #     pulled a stream; another worker won't pick this up
      #     until one finishes. To avoid head-of-line blocking
      #     behind the warmup pool, fall back to `task.async {}`.
      #     The overflow fiber re-uses `dispatch_stream` so the
      #     dispatch contract is identical between pool and
      #     overflow paths. Concurrency is never artificially
      #     capped; the operator-facing knob is
      #     `h2.max_concurrent_streams`.
      if writer_ctx.dispatch_queue.size.zero?
        writer_ctx.dispatch_queue.enqueue(stream)
      else
        overflow_tasks << task.async do
          dispatch_stream(stream, writer_ctx, peer_addr)
        end
      end
    end
  end

  # Drain in-flight stream dispatches before we close the socket.
  overflow_tasks.each do |t|
    t.wait
  rescue StandardError
    nil
  end
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, IOError, OpenSSL::SSL::SSLError
  # Peer disconnect — nothing to do.
rescue ::Protocol::HTTP2::GoawayError, ::Protocol::HTTP2::ProtocolError, ::Protocol::HTTP2::HandshakeError
  # Protocol-level error — protocol-http2 has already emitted GOAWAY.
rescue StandardError => e
  @logger.error do
    {
      message: 'h2 connection error',
      error: e.message,
      error_class: e.class.name,
      backtrace: (e.backtrace || []).first(10).join(' | ')
    }
  end
ensure
  # Coordinated shutdown: flag the writer, signal it, wait for the final
  # drain, then close the real socket. Order matters — closing the
  # socket before the writer drains would discard final RST_STREAM /
  # GOAWAY / END_STREAM frames in the queue.
  if writer_ctx
    # 2.11-A — close the dispatch queue so any pre-spawned workers
    # parked on `dequeue` fall through (Async::Queue#dequeue returns
    # nil after close). Do this BEFORE waiting on the writer so
    # pool workers can drain their in-flight stream dispatches and
    # release the encode mutex; otherwise the writer might park
    # waiting for bytes that the dispatch worker never gets to
    # encode.
    begin
      writer_ctx.dispatch_queue.close unless writer_ctx.dispatch_queue.closed?
    rescue StandardError
      nil
    end
    writer_ctx.shutdown!
    begin
      writer_task&.wait
    rescue StandardError
      nil
    end
    # 2.10-G — emit one info-level timing line per connection when the
    # opt-in instrumentation is enabled and we collected a full set of
    # samples (a connection that died before serving any stream lacks
    # t2_first_encode / t2_first_wire and gets skipped — there's no
    # first-stream signal to report).
    log_h2_first_stream_timing(writer_ctx) if @h2_timing_enabled
  end
  @metrics.decrement(:connections_active)
  socket.close unless socket.closed?
end