Class: Hyperion::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/connection.rb

Overview

Drives one TCP connection through its lifecycle: read until headers complete + body, parse, dispatch via Rack adapter, write, close. Phase 2 adds fiber scheduling and keep-alive; the public surface (#serve) is stable.

Phase 1 assumes blocking I/O: socket.read(N) blocks until N bytes or EOF, so ‘break if chunk.nil? || chunk.empty?` correctly detects EOF in read_request. Phase 2 (fiber scheduler) introduces non-blocking semantics where short reads and EAGAIN must be distinguished from EOF — read_request will need to handle IO::WaitReadable explicitly at that point.

Constant Summary collapse

READ_CHUNK =
16 * 1024
MAX_HEADER_BYTES =
64 * 1024
MAX_BODY_BYTES =

16 MB cap. Phase 5 introduces streaming bodies.

16 * 1024 * 1024
HEADER_TERM =
"\r\n\r\n"
TIMEOUT_SENTINEL =
:__hyperion_read_timeout__
DEADLINE_SENTINEL =
:__hyperion_request_deadline__
OVERSIZED_BODY_SENTINEL =
:__hyperion_oversized_body__
IDLE_KEEPALIVE_TIMEOUT_SECONDS =
5
INBUF_INITIAL_CAPACITY =

Phase 2b (1.7.1) — per-Connection pre-sized scratch buffer for the read accumulator. Most HTTP/1.1 request lines + headers fit in a few hundred bytes; 8 KiB covers ~99% of legitimate traffic without ever re-allocating. We reuse the same String across keep-alive requests on the same connection (clear between requests preserves capacity). Requests larger than 8 KiB still parse correctly — ‘String#<<` grows the underlying buffer transparently — they just pay the realloc the first time, same as the pre-1.7.1 behaviour.

8 * 1024
REJECT_413_PAYLOAD_TOO_LARGE =

Pre-built canned 413 — body is small + plain text, connection forced closed. Reused across every oversized-CL rejection so the DOS-defense path stays allocation-free and never has to dip into ResponseWriter (which would require a full Rack-style headers hash for an error we can answer with frozen bytes).

(+"HTTP/1.1 413 Payload Too Large\r\n" \
"content-type: text/plain\r\n" \
"content-length: 18\r\n" \
"connection: close\r\n" \
"\r\n" \
"payload too large\n").freeze
REJECT_503_PER_CONN_OVERLOAD =

2.3-B per-conn fairness 503. Connection stays alive (no ‘connection: close` here, no `Connection: close` to nginx) so the upstream peer can retry the request in 1s — nginx-friendly. Body is small + plain text + frozen so the reject path stays allocation-free on the hot path.

(+"HTTP/1.1 503 Service Unavailable\r\n" \
"content-type: text/plain\r\n" \
"content-length: 31\r\n" \
"retry-after: 1\r\n" \
"\r\n" \
"per-connection overload, retry\n").freeze
REQUEST_DURATION_BUCKETS =

2.4-C — histogram bucket edges for the per-route request duration histogram. Powers-of-5 spread covers 1ms to 10s, the realistic range for any HTTP-served workload. Frozen so the same Array is reused across every Connection (cheaper hist registration, no per-conn allocation).

[0.001, 0.005, 0.025, 0.1, 0.5, 2.5, 10.0].freeze
REQUEST_DURATION_HISTOGRAM =
:hyperion_request_duration_seconds
STATUS_CLASS =

Pre-bucketed status-class strings. Lookup ‘STATUS_CLASS[status / 100]` avoids `“#nxx”` interpolation per request.

%w[0xx 1xx 2xx 3xx 4xx 5xx 6xx 7xx 8xx 9xx].each(&:freeze).freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parser: self.class.default_parser, writer: ResponseWriter.new, thread_pool: nil, log_requests: nil, max_body_bytes: MAX_BODY_BYTES, runtime: nil, max_in_flight_per_conn: nil, path_templater: nil, route_table: nil) ⇒ Connection

Returns a new instance of Connection.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/hyperion/connection.rb', line 77

def initialize(parser: self.class.default_parser, writer: ResponseWriter.new, thread_pool: nil,
               log_requests: nil, max_body_bytes: MAX_BODY_BYTES, runtime: nil,
               max_in_flight_per_conn: nil, path_templater: nil, route_table: nil)
  @parser         = parser
  @writer         = writer
  @thread_pool    = thread_pool
  @max_body_bytes = max_body_bytes
  # 2.3-B: per-conn fairness cap. nil disables the check entirely
  # (the hot path stays branchless). Positive integer sets the
  # in-flight ceiling. The counter + dedup-warn flag live as ivars
  # so a single Connection's lifetime sees one warn at most, not
  # one per rejected request.
  @max_in_flight_per_conn = max_in_flight_per_conn
  @in_flight              = 0
  @in_flight_mutex        = Mutex.new if max_in_flight_per_conn
  @overload_warned        = false
  # 1.7.0: explicit Runtime injection. When the caller passes
  # `runtime:`, that runtime is the sole source of metrics + logger
  # for this connection — no implicit fallback to module-level
  # singletons. When omitted, fall back to `Runtime.default` so
  # legacy callers keep working untouched.
  #
  # We still cache the metrics/logger refs in ivars (vs reading
  # `runtime.metrics` per request) so the hot path doesn't pay a
  # method-dispatch per increment. Long-lived keep-alive connections
  # therefore see a Runtime swap only at construction — that's a
  # 1.7.0 limitation; 2.0 drops the singleton entirely and the
  # ivar cache becomes the only path.
  if runtime
    @runtime = runtime
    @metrics = runtime.metrics
    @logger  = runtime.logger
  else
    # No explicit runtime → keep the 1.6.x shape: ivars cache the
    # module-level accessors. This preserves stub seams used by
    # existing specs (`allow(Hyperion).to receive(:metrics)`) and
    # the `Hyperion.instance_variable_set(:@metrics, ...)` swap.
    @runtime = Hyperion::Runtime.default
    @metrics = Hyperion.metrics
    @logger  = Hyperion.logger
  end
  # Per-request access logging is ON by default (matches Puma+Rails
  # operator expectation). The hot path is optimised end-to-end: one
  # Process.clock_gettime per request, per-thread cached timestamp,
  # hand-rolled line builder, lock-free emit. Operator disables via
  # `--no-log-requests` or `HYPERION_LOG_REQUESTS=0`.
  @log_requests = if log_requests.nil?
                    # Per-Connection override absent → consult the
                    # Runtime's logging config (1.7.0+) which falls
                    # through to `Hyperion.log_requests?` (env +
                    # default ON).
                    Hyperion.log_requests?
                  else
                    log_requests
                  end
  # 2.4-C: cache the path-templater ref at construction. Reading it
  # via Hyperion::Metrics.default_path_templater per request would
  # add a method dispatch + a memo branch on every observation — we
  # keep the existing pattern of caching boot-time refs as ivars so
  # the per-request observe stays a single Hash lookup.
  @path_templater = path_templater || Hyperion::Metrics.default_path_templater
  # 2.10-D — direct-dispatch route table.  The hot-path lookup
  # is `@route_table&.lookup(method, path)` so the nil-default
  # case (no operator-registered direct routes — the
  # overwhelming majority of 2.x deployments) collapses to a
  # single `nil`-test before falling through to the Rack
  # adapter.  When `route_table:` is passed we honour the
  # explicit value (test seam / multi-tenant).  When omitted
  # AND the Hyperion::Server class is loaded, we resolve to
  # the process-wide singleton; ad-hoc Connection callers in
  # specs that don't load Server keep the nil fallback.
  @route_table = if route_table
                   route_table
                 elsif defined?(Hyperion::Server) && Hyperion::Server.respond_to?(:route_table)
                   Hyperion::Server.route_table
                 end
  register_request_duration_histogram!
end

Instance Attribute Details

#response_dispatch_modeObject

2.6-C — per-response dispatch-mode override. Reset to ‘nil` at the top of each request iteration; the Rack adapter sets this to `:inline_blocking` when it auto-detects a static-file body (`body.respond_to?(:to_path)`) or when the app explicitly opts in via `env = :inline_blocking`. The response-write path reads it back here in `serve` and forwards the symbol to `ResponseWriter#write` so the writer can pick the blocking-sendfile variant.

The override is per-RESPONSE, NOT per-connection: the connection’s connection-wide dispatch mode (resolved at boot from ‘tls`, `async_io`, `thread_count`, ALPN) stays whatever the operator configured. Only the response-write loop downgrades.



194
195
196
# File 'lib/hyperion/connection.rb', line 194

def response_dispatch_mode
  @response_dispatch_mode
end

#socketObject (readonly)

2.1.0 (WS-1): the connection itself caches the live socket so that ‘hijack!` (called from inside the app, possibly on a thread-pool worker thread) can reach back and yield it. `@hijacked` is the flag that gates writer + cleanup behaviour after the app returns. Reset at the top of each request iteration: a keep-alive client that does NOT hijack on request N must still get the normal response path, and a hijack on request N+1 should not be observed during request N.



179
180
181
# File 'lib/hyperion/connection.rb', line 179

def socket
  @socket
end

Class Method Details

.c_chunked_available?Boolean

Whether Hyperion::CParser.chunked_body_complete? is available. Probed lazily at first use; memoised in a class-level ivar to keep the per-request hot path branchless.

Returns:

  • (Boolean)


940
941
942
943
944
945
# File 'lib/hyperion/connection.rb', line 940

def self.c_chunked_available?
  return @c_chunked_available unless @c_chunked_available.nil?

  @c_chunked_available = defined?(::Hyperion::CParser) &&
                         ::Hyperion::CParser.respond_to?(:chunked_body_complete?)
end

.default_parserObject

Default parser is the C-extension ‘CParser` when the extension built; otherwise we fall back to the pure-Ruby `Parser`. Evaluated each call because Ruby evaluates default kwargs at call time.



60
61
62
# File 'lib/hyperion/connection.rb', line 60

def self.default_parser
  defined?(::Hyperion::CParser) ? ::Hyperion::CParser.new : ::Hyperion::Parser.new
end

Instance Method Details

#hijack!Object

Called by the Rack app (via ‘env.call`). Flips the `@hijacked` flag — Connection#serve checks this after `call_app` returns and skips the writer + the ensure-block close. Returns the raw socket IO so the app can speak any post-HTTP protocol on it.

Idempotent: a subsequent call returns the same socket without re-flipping (the flag is monotonic). Defensive — apps occasionally do ‘io = env.call; io2 = env.call` when chaining middleware.



209
210
211
212
213
# File 'lib/hyperion/connection.rb', line 209

def hijack!
  @hijacked = true
  Hyperion.metrics.increment(:rack_hijacks) if defined?(Hyperion) && Hyperion.respond_to?(:metrics)
  @socket
end

#hijack_bufferedObject

Bytes the connection had buffered past the parsed request boundary at the moment we entered the dispatch step (pipelined keep-alive carry, or — for an Upgrade — early bytes the client sent right after the headers, before they could see our 101 response). Returns a binary-encoded String (possibly empty). Captured fresh per request inside ‘serve` before `call_app` so reads from the socket past this point still go to the OS buffer; the carry is the application’s responsibility to drain.



223
224
225
# File 'lib/hyperion/connection.rb', line 223

def hijack_buffered
  @hijack_buffered ||= +''
end

#hijacked?Boolean

Returns:

  • (Boolean)


196
197
198
# File 'lib/hyperion/connection.rb', line 196

def hijacked?
  @hijacked == true
end

#register_request_duration_histogram!Object

2.4-C: register the per-route histogram family on this Connection’s metrics sink. Idempotent — ‘Metrics#register_histogram` no-ops on re-registration with the same shape. Called once per Connection so the histogram exists before the first observe.



160
161
162
163
164
165
166
167
168
169
170
# File 'lib/hyperion/connection.rb', line 160

def register_request_duration_histogram!
  @metrics.register_histogram(
    REQUEST_DURATION_HISTOGRAM,
    buckets: REQUEST_DURATION_BUCKETS,
    label_keys: %w[method path status]
  )
rescue StandardError
  # Histogram registration is observability — never block a Connection
  # from booting because the metrics sink misbehaved.
  nil
end

#serve(socket, app, max_request_read_seconds: 60) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/hyperion/connection.rb', line 227

def serve(socket, app, max_request_read_seconds: 60)
  request_count = 0
  @socket = socket
  @hijacked = false
  # 2.6-D — sticky flag set after each `:inline_blocking` response
  # so the next request iteration on the same keep-alive
  # connection can bypass the per-conn fairness admission check
  # (and the bookkeeping it carries).  See the
  # `skip_per_conn_fairness` branch in the request loop below.
  @last_response_was_static_inline_blocking = false
  # Phase 2b (1.7.1): pre-size the read accumulator once per connection
  # and reuse it across keep-alive requests. `String#clear` between
  # requests preserves the underlying capacity, so subsequent appends
  # don't pay the realloc tax. Pre-1.7.1 allocated a fresh `+''` per
  # request; per-connection reuse is a strict win because the previous
  # request's carry-over bytes (pipelined input) are copied into this
  # same buffer at the bottom of the loop instead of into a new String.
  @inbuf ||= String.new(capacity: INBUF_INITIAL_CAPACITY, encoding: Encoding::ASCII_8BIT)
  peer_addr = peer_address(socket)
  @metrics.increment(:connections_accepted)
  @metrics.increment(:connections_active)
  loop do
    # Per-request wallclock deadline. Captured fresh for every request so
    # long-lived keep-alive sessions with many small requests don't
    # falsely trip after the cumulative budget elapses.
    request_started_clock = Process.clock_gettime(Process::CLOCK_MONOTONIC) if max_request_read_seconds
    # 2.6-C — clear the per-response dispatch-mode override at the
    # top of every request iteration.  The Rack adapter sets it
    # *during* `app.call` (auto-detect on `to_path` body or
    # explicit `env['hyperion.dispatch_mode']` override) and the
    # writer reads it back; a keep-alive client whose request N
    # was static must NOT have request N+1 inherit the
    # `:inline_blocking` flag if request N+1's body is a streaming
    # response.
    @response_dispatch_mode = nil
    buffer = read_request(socket, @inbuf, deadline_started_at: request_started_clock,
                                          max_request_read_seconds: max_request_read_seconds,
                                          peer_addr: peer_addr)
    return unless buffer

    if buffer == TIMEOUT_SENTINEL
      # Idle timeout between keep-alive requests: close silently — the peer
      # never started a new request, so there's nothing to 408 about.
      @metrics.increment(:read_timeouts)
      return if request_count.positive?

      safe_write_error(socket, 408, 'Request Timeout')
      @metrics.increment_status(408)
      return
    end

    # Slowloris-style abort: deadline tripped during read. We've already
    # written the 408 (best-effort) inside read_request; close out here.
    return if buffer == DEADLINE_SENTINEL

    # DOS-defense: client declared a Content-Length larger than
    # max_body_bytes. We've already written the canned 413 + close inside
    # read_request, BEFORE reading any body bytes. Drop the connection.
    return if buffer == OVERSIZED_BODY_SENTINEL

    request, body_end = @parser.parse(buffer)
    # Carry over any pipelined trailing bytes for the next iteration. We
    # rewrite @inbuf in place — `replace` keeps the underlying capacity
    # allocation, so the next request starts with a warm 8 KiB buffer.
    #
    # 2.1.0 (WS-1): snapshot the carry BEFORE we collapse it back into
    # the read buffer. If the app full-hijacks this request, those
    # bytes are the application's responsibility (sent right after the
    # Upgrade headers, etc.) — exposed via `env['hyperion.hijack_buffered']`.
    # On the non-hijack hot path the snapshot is empty (no allocation
    # past the constant `EMPTY_BIN`) for keep-alive without pipelining.
    @hijack_buffered = if buffer.bytesize > body_end
                         buffer.byteslice(body_end,
                                          buffer.bytesize - body_end).b
                       else
                         EMPTY_BIN
                       end
    carry_into_inbuf!(buffer, body_end)
    request = enrich_with_peer(request, peer_addr) if peer_addr && request.peer_address.nil?

    @metrics.increment(:bytes_read, body_end)
    @metrics.increment(:requests_total)
    @metrics.increment(:requests_in_flight)
    # 2.4-C: capture start time for the per-route duration histogram.
    # Same Process.clock_gettime that the access-log path was already
    # paying — at default-ON log_requests the second call here is
    # avoided (we reuse `request_started_at`).
    request_started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    # 2.10-D — direct-dispatch fast path.  Bypasses the Rack
    # adapter entirely (no env-hash build, no middleware chain,
    # no body-iteration overhead) on routes the operator
    # registered via `Hyperion::Server.handle(:GET, '/path',
    # handler)` or `.handle_static(...)`.  Lifecycle hooks
    # still fire so trace instrumentation works regardless of
    # dispatch shape.
    #
    # Lookup is O(1) (two Hash#[] hits) and the nil-default
    # case (no direct routes registered — the overwhelming
    # majority of deployments) collapses to one nil-test plus
    # one Hash#[] miss before falling through to the regular
    # path; cost on the regular path is < 1 us.
    if @route_table && (direct_handler = @route_table.lookup(request.method, request.path))
      dispatch_direct!(socket, request, direct_handler, request_started_at, peer_addr)
      request_count += 1
      break unless should_keep_alive_after_direct?(request)

      set_idle_timeout(socket)
      next
    end
    # 2.3-B per-conn fairness gate. Returns true when the slot was
    # reserved (caller must release in ensure), false when the cap
    # was hit and a 503 was emitted. nil cap → admit always (hot
    # path stays branchless).
    #
    # 2.6-D — skip the fairness check entirely on connections whose
    # previous response was `:inline_blocking` (auto-detected
    # static-file traffic).  Static streams are dominated by the
    # write phase, not concurrent app.call invocations, so the
    # per-conn fairness cap is dead weight here — its purpose is
    # to throttle dynamic-route concurrency on a single keep-alive
    # connection.  Static-asset connections (CDN origins, signed-
    # download responders) typically run a long sequence of
    # `to_path` responses; once the first one auto-detects, the
    # remaining requests skip the admit / release / metric trio.
    # The flag flips back to false the moment a non-static
    # response lands on the same connection.
    skip_per_conn_fairness = @last_response_was_static_inline_blocking
    if @max_in_flight_per_conn && !skip_per_conn_fairness && !per_conn_admit!(socket, peer_addr)
      @metrics.decrement(:requests_in_flight)
      request_count += 1
      # Don't close — keep the conn alive so the upstream peer can
      # retry after the in-flight request drains. Skip writer +
      # logging (we wrote a canned response above) and proceed to
      # the next iteration's read.
      set_idle_timeout(socket)
      next
    end
    begin
      status, headers, body = call_app(app, request)
    ensure
      @metrics.decrement(:requests_in_flight)
      per_conn_release! if @max_in_flight_per_conn && !skip_per_conn_fairness
    end

    # 2.1.0 (WS-1): if the app called `env['rack.hijack'].call` during
    # `call_app`, the connection has handed the socket over. We MUST
    # NOT write a response (the app is now driving the wire) and we
    # MUST NOT close the socket (the app owns it). The status/headers/body
    # tuple from the app is ignored on this path — Rack 3 spec calls this
    # out explicitly. Drop out of the per-request loop; the ensure block
    # will skip socket close because of @hijacked.
    if @hijacked
      @logger.debug do
        { message: 'rack hijack', method: request.method, path: request.path, peer_addr: peer_addr }
      end
      # Drop body if the app still returned one — apps occasionally
      # return [-1, {}, []] but some return real arrays out of habit.
      # We don't iterate or close the body; iterating would let it
      # write to the (now app-owned) socket via env['rack.input'] etc.
      # body.close is the one safe call (frees temp files), best-effort.
      body.close if body.respond_to?(:close)
      return
    end

    keep_alive = should_keep_alive?(request, status, headers)
    # 2.6-C — pass the per-response dispatch-mode override to the
    # writer.  Default `nil` means "use the writer's default
    # (fiber-yielding sendfile / userspace copy)".  Only
    # `:inline_blocking` currently flips the writer onto a
    # different code path (the Puma-style serial-per-thread
    # blocking-sendfile loop).  Forward-compatible — future per-
    # response dispatch modes plug in here without changing the
    # call-site shape.
    @writer.write(socket, status, headers, body, keep_alive: keep_alive,
                                                 dispatch_mode: @response_dispatch_mode)
    # 2.6-D — record whether this response engaged
    # `:inline_blocking` so the next request iteration can skip
    # the per-conn fairness admission check (see the
    # `skip_per_conn_fairness` branch above).  Sticky on
    # consecutive static responses; resets on the first non-
    # static response back on the same conn.
    @last_response_was_static_inline_blocking =
      @response_dispatch_mode == :inline_blocking
    @metrics.increment_status(status)
    log_request(request, status, request_started_at) if @log_requests
    # 2.4-C: per-route duration histogram observation. Templating the
    # path (e.g. `/users/123` → `/users/:id`) keeps cardinality
    # bounded; the templater itself is LRU-cached so the cost on a
    # repeated path is one Hash#[] + one Hash re-insert. We swallow
    # any exception — observability must never block a response.
    observe_request_duration(request, status, request_started_at)
    request_count += 1

    return unless keep_alive

    # Idle wait between requests: don't hold a fiber forever on a quiet conn.
    set_idle_timeout(socket)
  end
rescue ParseError => e
  @metrics.increment(:parse_errors)
  @logger.warn { { message: 'parse error', error: e.message, error_class: e.class.name } }
  safe_write_error(socket, 400, 'Bad Request')
  @metrics.increment_status(400)
rescue UnsupportedError => e
  @logger.warn { { message: 'unsupported request', error: e.message, error_class: e.class.name } }
  safe_write_error(socket, 501, 'Not Implemented')
  @metrics.increment_status(501)
rescue StandardError => e
  @metrics.increment(:app_errors)
  @logger.error do
    { message: 'unhandled in connection', error: e.message, error_class: e.class.name }
  end
ensure
  @metrics.decrement(:connections_active)
  # Flush any buffered access-log lines for this thread before letting
  # the connection go idle. Otherwise a low-traffic worker would hold
  # logs in its per-thread buffer indefinitely.
  @logger.flush_access_buffer if @log_requests && @logger.respond_to?(:flush_access_buffer)
  # 2.1.0 (WS-1): when the app full-hijacked the socket, ownership has
  # transferred. Hyperion MUST NOT close — the app may still be reading
  # from / writing to the wire (e.g. an open WebSocket) long after this
  # fiber exits. Skip the close branch entirely; the app is the sole
  # closer from this point on.
  unless @hijacked
    # 2.4-C: drop the per-worker kTLS gauge for this socket if it
    # was tracked at handshake time. No-op for plain TCP and for
    # TLS-without-kTLS sockets.
    Hyperion::TLS.untrack_ktls_handshake!(socket) if defined?(Hyperion::TLS)
    begin
      socket.close unless socket.closed?
    rescue StandardError
      # Already failing; swallow close errors so we don't mask the real cause.
    end
  end
end