Class: Hyperion::Connection
- Inherits:
-
Object
- Object
- Hyperion::Connection
- Defined in:
- lib/hyperion/connection.rb
Overview
Drives one TCP connection through its lifecycle: read until headers complete + body, parse, dispatch via Rack adapter, write, close. Phase 2 adds fiber scheduling and keep-alive; the public surface (#serve) is stable.
Phase 1 assumes blocking I/O: socket.read(N) blocks until N bytes or EOF, so ‘break if chunk.nil? || chunk.empty?` correctly detects EOF in read_request. Phase 2 (fiber scheduler) introduces non-blocking semantics where short reads and EAGAIN must be distinguished from EOF — read_request will need to handle IO::WaitReadable explicitly at that point.
Constant Summary collapse
- READ_CHUNK =
16 * 1024
- MAX_HEADER_BYTES =
64 * 1024
- MAX_BODY_BYTES =
16 MB cap. Phase 5 introduces streaming bodies.
16 * 1024 * 1024
- HEADER_TERM =
"\r\n\r\n"- TIMEOUT_SENTINEL =
:__hyperion_read_timeout__- DEADLINE_SENTINEL =
:__hyperion_request_deadline__- OVERSIZED_BODY_SENTINEL =
:__hyperion_oversized_body__- IDLE_KEEPALIVE_TIMEOUT_SECONDS =
5- INBUF_INITIAL_CAPACITY =
Phase 2b (1.7.1) — per-Connection pre-sized scratch buffer for the read accumulator. Most HTTP/1.1 request lines + headers fit in a few hundred bytes; 8 KiB covers ~99% of legitimate traffic without ever re-allocating. We reuse the same String across keep-alive requests on the same connection (clear between requests preserves capacity). Requests larger than 8 KiB still parse correctly — ‘String#<<` grows the underlying buffer transparently — they just pay the realloc the first time, same as the pre-1.7.1 behaviour.
8 * 1024
- REJECT_413_PAYLOAD_TOO_LARGE =
Pre-built canned 413 — body is small + plain text, connection forced closed. Reused across every oversized-CL rejection so the DOS-defense path stays allocation-free and never has to dip into ResponseWriter (which would require a full Rack-style headers hash for an error we can answer with frozen bytes).
(+"HTTP/1.1 413 Payload Too Large\r\n" \ "content-type: text/plain\r\n" \ "content-length: 18\r\n" \ "connection: close\r\n" \ "\r\n" \ "payload too large\n").freeze
- REJECT_503_PER_CONN_OVERLOAD =
2.3-B per-conn fairness 503. Connection stays alive (no ‘connection: close` here, no `Connection: close` to nginx) so the upstream peer can retry the request in 1s — nginx-friendly. Body is small + plain text + frozen so the reject path stays allocation-free on the hot path.
(+"HTTP/1.1 503 Service Unavailable\r\n" \ "content-type: text/plain\r\n" \ "content-length: 31\r\n" \ "retry-after: 1\r\n" \ "\r\n" \ "per-connection overload, retry\n").freeze
- REQUEST_DURATION_BUCKETS =
2.4-C — histogram bucket edges for the per-route request duration histogram. Powers-of-5 spread covers 1ms to 10s, the realistic range for any HTTP-served workload. Frozen so the same Array is reused across every Connection (cheaper hist registration, no per-conn allocation).
[0.001, 0.005, 0.025, 0.1, 0.5, 2.5, 10.0].freeze
- REQUEST_DURATION_HISTOGRAM =
:hyperion_request_duration_seconds- STATUS_CLASS =
Pre-bucketed status-class strings. Lookup ‘STATUS_CLASS[status / 100]` avoids `“#nxx”` interpolation per request.
%w[0xx 1xx 2xx 3xx 4xx 5xx 6xx 7xx 8xx 9xx].each(&:freeze).freeze
Instance Attribute Summary collapse
-
#response_dispatch_mode ⇒ Object
2.6-C — per-response dispatch-mode override.
-
#socket ⇒ Object
readonly
2.1.0 (WS-1): the connection itself caches the live socket so that ‘hijack!` (called from inside the app, possibly on a thread-pool worker thread) can reach back and yield it.
Class Method Summary collapse
-
.c_chunked_available? ⇒ Boolean
Whether Hyperion::CParser.chunked_body_complete? is available.
-
.default_parser ⇒ Object
Default parser is the C-extension ‘CParser` when the extension built; otherwise we fall back to the pure-Ruby `Parser`.
Instance Method Summary collapse
-
#hijack! ⇒ Object
Called by the Rack app (via ‘env.call`).
-
#hijack_buffered ⇒ Object
Bytes the connection had buffered past the parsed request boundary at the moment we entered the dispatch step (pipelined keep-alive carry, or — for an Upgrade — early bytes the client sent right after the headers, before they could see our 101 response).
- #hijacked? ⇒ Boolean
-
#initialize(parser: self.class.default_parser, writer: ResponseWriter.new, thread_pool: nil, log_requests: nil, max_body_bytes: MAX_BODY_BYTES, runtime: nil, max_in_flight_per_conn: nil, path_templater: nil, route_table: nil) ⇒ Connection
constructor
A new instance of Connection.
-
#register_request_duration_histogram! ⇒ Object
2.4-C: register the per-route histogram family on this Connection’s metrics sink.
- #serve(socket, app, max_request_read_seconds: 60) ⇒ Object
Constructor Details
#initialize(parser: self.class.default_parser, writer: ResponseWriter.new, thread_pool: nil, log_requests: nil, max_body_bytes: MAX_BODY_BYTES, runtime: nil, max_in_flight_per_conn: nil, path_templater: nil, route_table: nil) ⇒ Connection
Returns a new instance of Connection.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/hyperion/connection.rb', line 77 def initialize(parser: self.class.default_parser, writer: ResponseWriter.new, thread_pool: nil, log_requests: nil, max_body_bytes: MAX_BODY_BYTES, runtime: nil, max_in_flight_per_conn: nil, path_templater: nil, route_table: nil) @parser = parser @writer = writer @thread_pool = thread_pool @max_body_bytes = max_body_bytes # 2.3-B: per-conn fairness cap. nil disables the check entirely # (the hot path stays branchless). Positive integer sets the # in-flight ceiling. The counter + dedup-warn flag live as ivars # so a single Connection's lifetime sees one warn at most, not # one per rejected request. @max_in_flight_per_conn = max_in_flight_per_conn @in_flight = 0 @in_flight_mutex = Mutex.new if max_in_flight_per_conn @overload_warned = false # 1.7.0: explicit Runtime injection. When the caller passes # `runtime:`, that runtime is the sole source of metrics + logger # for this connection — no implicit fallback to module-level # singletons. When omitted, fall back to `Runtime.default` so # legacy callers keep working untouched. # # We still cache the metrics/logger refs in ivars (vs reading # `runtime.metrics` per request) so the hot path doesn't pay a # method-dispatch per increment. Long-lived keep-alive connections # therefore see a Runtime swap only at construction — that's a # 1.7.0 limitation; 2.0 drops the singleton entirely and the # ivar cache becomes the only path. if runtime @runtime = runtime @metrics = runtime.metrics @logger = runtime.logger else # No explicit runtime → keep the 1.6.x shape: ivars cache the # module-level accessors. This preserves stub seams used by # existing specs (`allow(Hyperion).to receive(:metrics)`) and # the `Hyperion.instance_variable_set(:@metrics, ...)` swap. @runtime = Hyperion::Runtime.default @metrics = Hyperion.metrics @logger = Hyperion.logger end # Per-request access logging is ON by default (matches Puma+Rails # operator expectation). The hot path is optimised end-to-end: one # Process.clock_gettime per request, per-thread cached timestamp, # hand-rolled line builder, lock-free emit. Operator disables via # `--no-log-requests` or `HYPERION_LOG_REQUESTS=0`. @log_requests = if log_requests.nil? # Per-Connection override absent → consult the # Runtime's logging config (1.7.0+) which falls # through to `Hyperion.log_requests?` (env + # default ON). Hyperion.log_requests? else log_requests end # 2.4-C: cache the path-templater ref at construction. Reading it # via Hyperion::Metrics.default_path_templater per request would # add a method dispatch + a memo branch on every observation — we # keep the existing pattern of caching boot-time refs as ivars so # the per-request observe stays a single Hash lookup. @path_templater = path_templater || Hyperion::Metrics.default_path_templater # 2.10-D — direct-dispatch route table. The hot-path lookup # is `@route_table&.lookup(method, path)` so the nil-default # case (no operator-registered direct routes — the # overwhelming majority of 2.x deployments) collapses to a # single `nil`-test before falling through to the Rack # adapter. When `route_table:` is passed we honour the # explicit value (test seam / multi-tenant). When omitted # AND the Hyperion::Server class is loaded, we resolve to # the process-wide singleton; ad-hoc Connection callers in # specs that don't load Server keep the nil fallback. @route_table = if route_table route_table elsif defined?(Hyperion::Server) && Hyperion::Server.respond_to?(:route_table) Hyperion::Server.route_table end register_request_duration_histogram! end |
Instance Attribute Details
#response_dispatch_mode ⇒ Object
2.6-C — per-response dispatch-mode override. Reset to ‘nil` at the top of each request iteration; the Rack adapter sets this to `:inline_blocking` when it auto-detects a static-file body (`body.respond_to?(:to_path)`) or when the app explicitly opts in via `env = :inline_blocking`. The response-write path reads it back here in `serve` and forwards the symbol to `ResponseWriter#write` so the writer can pick the blocking-sendfile variant.
The override is per-RESPONSE, NOT per-connection: the connection’s connection-wide dispatch mode (resolved at boot from ‘tls`, `async_io`, `thread_count`, ALPN) stays whatever the operator configured. Only the response-write loop downgrades.
194 195 196 |
# File 'lib/hyperion/connection.rb', line 194 def response_dispatch_mode @response_dispatch_mode end |
#socket ⇒ Object (readonly)
2.1.0 (WS-1): the connection itself caches the live socket so that ‘hijack!` (called from inside the app, possibly on a thread-pool worker thread) can reach back and yield it. `@hijacked` is the flag that gates writer + cleanup behaviour after the app returns. Reset at the top of each request iteration: a keep-alive client that does NOT hijack on request N must still get the normal response path, and a hijack on request N+1 should not be observed during request N.
179 180 181 |
# File 'lib/hyperion/connection.rb', line 179 def socket @socket end |
Class Method Details
.c_chunked_available? ⇒ Boolean
Whether Hyperion::CParser.chunked_body_complete? is available. Probed lazily at first use; memoised in a class-level ivar to keep the per-request hot path branchless.
940 941 942 943 944 945 |
# File 'lib/hyperion/connection.rb', line 940 def self.c_chunked_available? return @c_chunked_available unless @c_chunked_available.nil? @c_chunked_available = defined?(::Hyperion::CParser) && ::Hyperion::CParser.respond_to?(:chunked_body_complete?) end |
.default_parser ⇒ Object
Default parser is the C-extension ‘CParser` when the extension built; otherwise we fall back to the pure-Ruby `Parser`. Evaluated each call because Ruby evaluates default kwargs at call time.
60 61 62 |
# File 'lib/hyperion/connection.rb', line 60 def self.default_parser defined?(::Hyperion::CParser) ? ::Hyperion::CParser.new : ::Hyperion::Parser.new end |
Instance Method Details
#hijack! ⇒ Object
Called by the Rack app (via ‘env.call`). Flips the `@hijacked` flag — Connection#serve checks this after `call_app` returns and skips the writer + the ensure-block close. Returns the raw socket IO so the app can speak any post-HTTP protocol on it.
Idempotent: a subsequent call returns the same socket without re-flipping (the flag is monotonic). Defensive — apps occasionally do ‘io = env.call; io2 = env.call` when chaining middleware.
209 210 211 212 213 |
# File 'lib/hyperion/connection.rb', line 209 def hijack! @hijacked = true Hyperion.metrics.increment(:rack_hijacks) if defined?(Hyperion) && Hyperion.respond_to?(:metrics) @socket end |
#hijack_buffered ⇒ Object
Bytes the connection had buffered past the parsed request boundary at the moment we entered the dispatch step (pipelined keep-alive carry, or — for an Upgrade — early bytes the client sent right after the headers, before they could see our 101 response). Returns a binary-encoded String (possibly empty). Captured fresh per request inside ‘serve` before `call_app` so reads from the socket past this point still go to the OS buffer; the carry is the application’s responsibility to drain.
223 224 225 |
# File 'lib/hyperion/connection.rb', line 223 def hijack_buffered @hijack_buffered ||= +'' end |
#hijacked? ⇒ Boolean
196 197 198 |
# File 'lib/hyperion/connection.rb', line 196 def hijacked? @hijacked == true end |
#register_request_duration_histogram! ⇒ Object
2.4-C: register the per-route histogram family on this Connection’s metrics sink. Idempotent — ‘Metrics#register_histogram` no-ops on re-registration with the same shape. Called once per Connection so the histogram exists before the first observe.
160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/hyperion/connection.rb', line 160 def register_request_duration_histogram! @metrics.register_histogram( REQUEST_DURATION_HISTOGRAM, buckets: REQUEST_DURATION_BUCKETS, label_keys: %w[method path status] ) rescue StandardError # Histogram registration is observability — never block a Connection # from booting because the metrics sink misbehaved. nil end |
#serve(socket, app, max_request_read_seconds: 60) ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'lib/hyperion/connection.rb', line 227 def serve(socket, app, max_request_read_seconds: 60) request_count = 0 @socket = socket @hijacked = false # 2.6-D — sticky flag set after each `:inline_blocking` response # so the next request iteration on the same keep-alive # connection can bypass the per-conn fairness admission check # (and the bookkeeping it carries). See the # `skip_per_conn_fairness` branch in the request loop below. @last_response_was_static_inline_blocking = false # Phase 2b (1.7.1): pre-size the read accumulator once per connection # and reuse it across keep-alive requests. `String#clear` between # requests preserves the underlying capacity, so subsequent appends # don't pay the realloc tax. Pre-1.7.1 allocated a fresh `+''` per # request; per-connection reuse is a strict win because the previous # request's carry-over bytes (pipelined input) are copied into this # same buffer at the bottom of the loop instead of into a new String. @inbuf ||= String.new(capacity: INBUF_INITIAL_CAPACITY, encoding: Encoding::ASCII_8BIT) peer_addr = peer_address(socket) @metrics.increment(:connections_accepted) @metrics.increment(:connections_active) loop do # Per-request wallclock deadline. Captured fresh for every request so # long-lived keep-alive sessions with many small requests don't # falsely trip after the cumulative budget elapses. request_started_clock = Process.clock_gettime(Process::CLOCK_MONOTONIC) if max_request_read_seconds # 2.6-C — clear the per-response dispatch-mode override at the # top of every request iteration. The Rack adapter sets it # *during* `app.call` (auto-detect on `to_path` body or # explicit `env['hyperion.dispatch_mode']` override) and the # writer reads it back; a keep-alive client whose request N # was static must NOT have request N+1 inherit the # `:inline_blocking` flag if request N+1's body is a streaming # response. @response_dispatch_mode = nil buffer = read_request(socket, @inbuf, deadline_started_at: request_started_clock, max_request_read_seconds: max_request_read_seconds, peer_addr: peer_addr) return unless buffer if buffer == TIMEOUT_SENTINEL # Idle timeout between keep-alive requests: close silently — the peer # never started a new request, so there's nothing to 408 about. @metrics.increment(:read_timeouts) return if request_count.positive? safe_write_error(socket, 408, 'Request Timeout') @metrics.increment_status(408) return end # Slowloris-style abort: deadline tripped during read. We've already # written the 408 (best-effort) inside read_request; close out here. return if buffer == DEADLINE_SENTINEL # DOS-defense: client declared a Content-Length larger than # max_body_bytes. We've already written the canned 413 + close inside # read_request, BEFORE reading any body bytes. Drop the connection. return if buffer == OVERSIZED_BODY_SENTINEL request, body_end = @parser.parse(buffer) # Carry over any pipelined trailing bytes for the next iteration. We # rewrite @inbuf in place — `replace` keeps the underlying capacity # allocation, so the next request starts with a warm 8 KiB buffer. # # 2.1.0 (WS-1): snapshot the carry BEFORE we collapse it back into # the read buffer. If the app full-hijacks this request, those # bytes are the application's responsibility (sent right after the # Upgrade headers, etc.) — exposed via `env['hyperion.hijack_buffered']`. # On the non-hijack hot path the snapshot is empty (no allocation # past the constant `EMPTY_BIN`) for keep-alive without pipelining. @hijack_buffered = if buffer.bytesize > body_end buffer.byteslice(body_end, buffer.bytesize - body_end).b else EMPTY_BIN end carry_into_inbuf!(buffer, body_end) request = enrich_with_peer(request, peer_addr) if peer_addr && request.peer_address.nil? @metrics.increment(:bytes_read, body_end) @metrics.increment(:requests_total) @metrics.increment(:requests_in_flight) # 2.4-C: capture start time for the per-route duration histogram. # Same Process.clock_gettime that the access-log path was already # paying — at default-ON log_requests the second call here is # avoided (we reuse `request_started_at`). request_started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) # 2.10-D — direct-dispatch fast path. Bypasses the Rack # adapter entirely (no env-hash build, no middleware chain, # no body-iteration overhead) on routes the operator # registered via `Hyperion::Server.handle(:GET, '/path', # handler)` or `.handle_static(...)`. Lifecycle hooks # still fire so trace instrumentation works regardless of # dispatch shape. # # Lookup is O(1) (two Hash#[] hits) and the nil-default # case (no direct routes registered — the overwhelming # majority of deployments) collapses to one nil-test plus # one Hash#[] miss before falling through to the regular # path; cost on the regular path is < 1 us. if @route_table && (direct_handler = @route_table.lookup(request.method, request.path)) dispatch_direct!(socket, request, direct_handler, request_started_at, peer_addr) request_count += 1 break unless should_keep_alive_after_direct?(request) set_idle_timeout(socket) next end # 2.3-B per-conn fairness gate. Returns true when the slot was # reserved (caller must release in ensure), false when the cap # was hit and a 503 was emitted. nil cap → admit always (hot # path stays branchless). # # 2.6-D — skip the fairness check entirely on connections whose # previous response was `:inline_blocking` (auto-detected # static-file traffic). Static streams are dominated by the # write phase, not concurrent app.call invocations, so the # per-conn fairness cap is dead weight here — its purpose is # to throttle dynamic-route concurrency on a single keep-alive # connection. Static-asset connections (CDN origins, signed- # download responders) typically run a long sequence of # `to_path` responses; once the first one auto-detects, the # remaining requests skip the admit / release / metric trio. # The flag flips back to false the moment a non-static # response lands on the same connection. skip_per_conn_fairness = @last_response_was_static_inline_blocking if @max_in_flight_per_conn && !skip_per_conn_fairness && !per_conn_admit!(socket, peer_addr) @metrics.decrement(:requests_in_flight) request_count += 1 # Don't close — keep the conn alive so the upstream peer can # retry after the in-flight request drains. Skip writer + # logging (we wrote a canned response above) and proceed to # the next iteration's read. set_idle_timeout(socket) next end begin status, headers, body = call_app(app, request) ensure @metrics.decrement(:requests_in_flight) per_conn_release! if @max_in_flight_per_conn && !skip_per_conn_fairness end # 2.1.0 (WS-1): if the app called `env['rack.hijack'].call` during # `call_app`, the connection has handed the socket over. We MUST # NOT write a response (the app is now driving the wire) and we # MUST NOT close the socket (the app owns it). The status/headers/body # tuple from the app is ignored on this path — Rack 3 spec calls this # out explicitly. Drop out of the per-request loop; the ensure block # will skip socket close because of @hijacked. if @hijacked @logger.debug do { message: 'rack hijack', method: request.method, path: request.path, peer_addr: peer_addr } end # Drop body if the app still returned one — apps occasionally # return [-1, {}, []] but some return real arrays out of habit. # We don't iterate or close the body; iterating would let it # write to the (now app-owned) socket via env['rack.input'] etc. # body.close is the one safe call (frees temp files), best-effort. body.close if body.respond_to?(:close) return end keep_alive = should_keep_alive?(request, status, headers) # 2.6-C — pass the per-response dispatch-mode override to the # writer. Default `nil` means "use the writer's default # (fiber-yielding sendfile / userspace copy)". Only # `:inline_blocking` currently flips the writer onto a # different code path (the Puma-style serial-per-thread # blocking-sendfile loop). Forward-compatible — future per- # response dispatch modes plug in here without changing the # call-site shape. @writer.write(socket, status, headers, body, keep_alive: keep_alive, dispatch_mode: @response_dispatch_mode) # 2.6-D — record whether this response engaged # `:inline_blocking` so the next request iteration can skip # the per-conn fairness admission check (see the # `skip_per_conn_fairness` branch above). Sticky on # consecutive static responses; resets on the first non- # static response back on the same conn. @last_response_was_static_inline_blocking = @response_dispatch_mode == :inline_blocking @metrics.increment_status(status) log_request(request, status, request_started_at) if @log_requests # 2.4-C: per-route duration histogram observation. Templating the # path (e.g. `/users/123` → `/users/:id`) keeps cardinality # bounded; the templater itself is LRU-cached so the cost on a # repeated path is one Hash#[] + one Hash re-insert. We swallow # any exception — observability must never block a response. observe_request_duration(request, status, request_started_at) request_count += 1 return unless keep_alive # Idle wait between requests: don't hold a fiber forever on a quiet conn. set_idle_timeout(socket) end rescue ParseError => e @metrics.increment(:parse_errors) @logger.warn { { message: 'parse error', error: e., error_class: e.class.name } } safe_write_error(socket, 400, 'Bad Request') @metrics.increment_status(400) rescue UnsupportedError => e @logger.warn { { message: 'unsupported request', error: e., error_class: e.class.name } } safe_write_error(socket, 501, 'Not Implemented') @metrics.increment_status(501) rescue StandardError => e @metrics.increment(:app_errors) @logger.error do { message: 'unhandled in connection', error: e., error_class: e.class.name } end ensure @metrics.decrement(:connections_active) # Flush any buffered access-log lines for this thread before letting # the connection go idle. Otherwise a low-traffic worker would hold # logs in its per-thread buffer indefinitely. @logger.flush_access_buffer if @log_requests && @logger.respond_to?(:flush_access_buffer) # 2.1.0 (WS-1): when the app full-hijacked the socket, ownership has # transferred. Hyperion MUST NOT close — the app may still be reading # from / writing to the wire (e.g. an open WebSocket) long after this # fiber exits. Skip the close branch entirely; the app is the sole # closer from this point on. unless @hijacked # 2.4-C: drop the per-worker kTLS gauge for this socket if it # was tracked at handshake time. No-op for plain TCP and for # TLS-without-kTLS sockets. Hyperion::TLS.untrack_ktls_handshake!(socket) if defined?(Hyperion::TLS) begin socket.close unless socket.closed? rescue StandardError # Already failing; swallow close errors so we don't mask the real cause. end end end |