Class: Hyperion::Http2Handler

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/http2_handler.rb

Overview

Real HTTP/2 dispatch driven by ‘protocol-http2`.

Each TLS connection that negotiated ‘h2` via ALPN ends up here. We frame the socket, read the connection preface, and then drive a frame loop on the connection’s fiber: it reads one frame at a time and lets ‘protocol-http2` update its connection/stream state machines. As soon as a client stream finishes its request half (state `:half_closed_remote` via `end_stream?`), we hand the stream off to a sibling fiber for dispatch — slow handlers no longer block other streams on the same connection.

All framer writes (HEADERS, DATA, RST_STREAM) are serialized through a single connection-scoped Mutex (‘@send_mutex`). The OpenSSL::SSL::SSLSocket underneath is not safe to drive from two fibers concurrently, and protocol-http2’s HPACK encoder is also stateful across HEADERS frames, so all sends must be serialized.

Flow control: ‘RequestStream#window_updated` overrides the protocol-http2 default to fan a notification out to any fiber blocked in `send_body` waiting for the remote peer’s flow-control window to grow. The body writer chunks the response payload by the per-stream available frame size and yields on the notification when the window is exhausted, so large bodies never trip a FlowControlError.

Defined Under Namespace

Classes: RequestStream

Instance Method Summary collapse

Constructor Details

#initialize(app:, thread_pool: nil) ⇒ Http2Handler

Returns a new instance of Http2Handler.



215
216
217
218
219
220
# File 'lib/hyperion/http2_handler.rb', line 215

def initialize(app:, thread_pool: nil)
  @app         = app
  @thread_pool = thread_pool
  @metrics     = Hyperion.metrics
  @logger      = Hyperion.logger
end

Instance Method Details

#serve(socket) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/hyperion/http2_handler.rb', line 222

def serve(socket)
  @metrics.increment(:connections_accepted)
  @metrics.increment(:connections_active)
  framer = ::Protocol::HTTP2::Framer.new(socket)
  server = build_server(framer)
  server.read_connection_preface

  # Extract once — the same TCP peer drives every stream on this conn.
  peer_addr = peer_address(socket)

  # All framer writes (HEADERS / DATA / RST_STREAM / GOAWAY) must be
  # serialized: the underlying SSLSocket is not safe across fibers, and
  # the HPACK encoder is also stateful. The connection's own frame loop
  # uses this mutex too — see `dispatch_stream` and `send_body`.
  send_mutex = ::Mutex.new

  task = ::Async::Task.current

  # Track in-flight per-stream dispatch fibers so we can drain them on
  # connection close.
  stream_tasks = []

  until server.closed?
    ready_ids = []
    server.read_frame do |frame|
      ready_ids << frame.stream_id if frame.stream_id.positive?
    end

    ready_ids.uniq.each do |sid|
      stream = server.streams[sid]
      next unless stream.is_a?(RequestStream)
      next unless stream.request_complete
      next if stream.closed?
      next if stream.instance_variable_get(:@hyperion_dispatched)

      # Mark before spawning so we never dispatch the same stream twice
      # if subsequent frames (e.g. RST_STREAM races) arrive.
      stream.instance_variable_set(:@hyperion_dispatched, true)

      stream_tasks << task.async do
        dispatch_stream(stream, send_mutex, peer_addr)
      end
    end
  end

  # Drain in-flight stream dispatches before we close the socket.
  stream_tasks.each do |t|
    t.wait
  rescue StandardError
    nil
  end
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, IOError, OpenSSL::SSL::SSLError
  # Peer disconnect — nothing to do.
rescue ::Protocol::HTTP2::GoawayError, ::Protocol::HTTP2::ProtocolError, ::Protocol::HTTP2::HandshakeError
  # Protocol-level error — protocol-http2 has already emitted GOAWAY.
rescue StandardError => e
  @logger.error do
    {
      message: 'h2 connection error',
      error: e.message,
      error_class: e.class.name,
      backtrace: (e.backtrace || []).first(10).join(' | ')
    }
  end
ensure
  @metrics.decrement(:connections_active)
  socket.close unless socket.closed?
end