Class: Hyperion::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/connection.rb

Overview

Drives one TCP connection through its lifecycle: read until headers complete + body, parse, dispatch via Rack adapter, write, close. Phase 2 adds fiber scheduling and keep-alive; the public surface (#serve) is stable.

Phase 1 assumes blocking I/O: socket.read(N) blocks until N bytes or EOF, so ‘break if chunk.nil? || chunk.empty?` correctly detects EOF in read_request. Phase 2 (fiber scheduler) introduces non-blocking semantics where short reads and EAGAIN must be distinguished from EOF — read_request will need to handle IO::WaitReadable explicitly at that point.

Constant Summary collapse

READ_CHUNK =
16 * 1024
MAX_HEADER_BYTES =
64 * 1024
MAX_BODY_BYTES =

16 MB cap. Phase 5 introduces streaming bodies.

16 * 1024 * 1024
HEADER_TERM =
"\r\n\r\n"
TIMEOUT_SENTINEL =
:__hyperion_read_timeout__
DEADLINE_SENTINEL =
:__hyperion_request_deadline__
IDLE_KEEPALIVE_TIMEOUT_SECONDS =
5

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parser: self.class.default_parser, writer: ResponseWriter.new, thread_pool: nil, log_requests: nil) ⇒ Connection

Returns a new instance of Connection.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/hyperion/connection.rb', line 30

def initialize(parser: self.class.default_parser, writer: ResponseWriter.new, thread_pool: nil,
               log_requests: nil)
  @parser      = parser
  @writer      = writer
  @thread_pool = thread_pool
  # Cache module-level singletons once per Connection instance so the hot
  # path doesn't re-dispatch through Hyperion.metrics / Hyperion.logger
  # (each was a method call + ivar nil-check on every request).
  @metrics     = Hyperion.metrics
  @logger      = Hyperion.logger
  # Per-request access logging is ON by default (matches Puma+Rails
  # operator expectation). The hot path is optimised end-to-end: one
  # Process.clock_gettime per request, per-thread cached timestamp,
  # hand-rolled line builder, lock-free emit. Operator disables via
  # `--no-log-requests` or `HYPERION_LOG_REQUESTS=0`.
  @log_requests = log_requests.nil? ? Hyperion.log_requests? : log_requests
end

Class Method Details

.default_parserObject

Default parser is the C-extension ‘CParser` when the extension built; otherwise we fall back to the pure-Ruby `Parser`. Evaluated each call because Ruby evaluates default kwargs at call time.



26
27
28
# File 'lib/hyperion/connection.rb', line 26

def self.default_parser
  defined?(::Hyperion::CParser) ? ::Hyperion::CParser.new : ::Hyperion::Parser.new
end

Instance Method Details

#serve(socket, app, max_request_read_seconds: 60) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/hyperion/connection.rb', line 48

def serve(socket, app, max_request_read_seconds: 60)
  request_count = 0
  carry = +'' # bytes already pulled off the socket but past the prev request boundary
  peer_addr = peer_address(socket)
  @metrics.increment(:connections_accepted)
  @metrics.increment(:connections_active)
  loop do
    # Per-request wallclock deadline. Captured fresh for every request so
    # long-lived keep-alive sessions with many small requests don't
    # falsely trip after the cumulative budget elapses.
    request_started_clock = Process.clock_gettime(Process::CLOCK_MONOTONIC) if max_request_read_seconds
    buffer = read_request(socket, carry, deadline_started_at: request_started_clock,
                                         max_request_read_seconds: max_request_read_seconds,
                                         peer_addr: peer_addr)
    return unless buffer

    if buffer == TIMEOUT_SENTINEL
      # Idle timeout between keep-alive requests: close silently — the peer
      # never started a new request, so there's nothing to 408 about.
      @metrics.increment(:read_timeouts)
      return if request_count.positive?

      safe_write_error(socket, 408, 'Request Timeout')
      @metrics.increment_status(408)
      return
    end

    # Slowloris-style abort: deadline tripped during read. We've already
    # written the 408 (best-effort) inside read_request; close out here.
    return if buffer == DEADLINE_SENTINEL

    request, body_end = @parser.parse(buffer)
    carry = +(buffer.byteslice(body_end, buffer.bytesize - body_end) || '')
    request = enrich_with_peer(request, peer_addr) if peer_addr && request.peer_address.nil?

    @metrics.increment(:bytes_read, body_end)
    @metrics.increment(:requests_total)
    @metrics.increment(:requests_in_flight)
    request_started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) if @log_requests
    begin
      status, headers, body = call_app(app, request)
    ensure
      @metrics.decrement(:requests_in_flight)
    end

    keep_alive = should_keep_alive?(request, status, headers)
    @writer.write(socket, status, headers, body, keep_alive: keep_alive)
    @metrics.increment_status(status)
    log_request(request, status, request_started_at) if @log_requests
    request_count += 1

    return unless keep_alive

    # Idle wait between requests: don't hold a fiber forever on a quiet conn.
    set_idle_timeout(socket)
  end
rescue ParseError => e
  @metrics.increment(:parse_errors)
  @logger.warn { { message: 'parse error', error: e.message, error_class: e.class.name } }
  safe_write_error(socket, 400, 'Bad Request')
  @metrics.increment_status(400)
rescue UnsupportedError => e
  @logger.warn { { message: 'unsupported request', error: e.message, error_class: e.class.name } }
  safe_write_error(socket, 501, 'Not Implemented')
  @metrics.increment_status(501)
rescue StandardError => e
  @metrics.increment(:app_errors)
  @logger.error do
    { message: 'unhandled in connection', error: e.message, error_class: e.class.name }
  end
ensure
  @metrics.decrement(:connections_active)
  # Flush any buffered access-log lines for this thread before letting
  # the connection go idle. Otherwise a low-traffic worker would hold
  # logs in its per-thread buffer indefinitely.
  @logger.flush_access_buffer if @log_requests && @logger.respond_to?(:flush_access_buffer)
  begin
    socket.close unless socket.closed?
  rescue StandardError
    # Already failing; swallow close errors so we don't mask the real cause.
  end
end