Class: Hyperion::Worker
- Inherits:
-
Object
- Object
- Hyperion::Worker
- Defined in:
- lib/hyperion/worker.rb
Overview
Worker process. Receives a listening socket and runs a ‘Hyperion::Server` (fiber accept loop) until SIGTERM.
Two listener sources, picked by the master per-OS:
-
‘:share` mode (macOS / BSD): the master forwards a pre-bound `TCPServer` / `OpenSSL::SSL::SSLServer` via the `listener:` kwarg. The worker uses it as-is — the fd was inherited across fork.
-
‘:reuseport` mode (Linux): no listener is passed. The worker binds its own `Socket` with `SO_REUSEPORT` set so the kernel can hash incoming connections across the sibling sockets.
Instance Method Summary collapse
-
#initialize(host:, port:, app:, read_timeout:, tls: nil, thread_count: Server::DEFAULT_THREAD_COUNT, config: nil, worker_index: 0, listener: nil, max_pending: nil, max_request_read_seconds: 60, h2_settings: nil, async_io: nil, runtime: nil, accept_fibers_per_worker: 1, h2_max_total_streams: nil, admin_listener_port: nil, admin_listener_host: '127.0.0.1', admin_token: nil, tls_session_cache_size: TLS::DEFAULT_SESSION_CACHE_SIZE, tls_ticket_key_rotation_signal: :USR2, tls_ktls: :auto, io_uring: :off, max_in_flight_per_conn: nil, tls_handshake_rate_limit: :unlimited, preload_static_dirs: nil) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
Constructor Details
#initialize(host:, port:, app:, read_timeout:, tls: nil, thread_count: Server::DEFAULT_THREAD_COUNT, config: nil, worker_index: 0, listener: nil, max_pending: nil, max_request_read_seconds: 60, h2_settings: nil, async_io: nil, runtime: nil, accept_fibers_per_worker: 1, h2_max_total_streams: nil, admin_listener_port: nil, admin_listener_host: '127.0.0.1', admin_token: nil, tls_session_cache_size: TLS::DEFAULT_SESSION_CACHE_SIZE, tls_ticket_key_rotation_signal: :USR2, tls_ktls: :auto, io_uring: :off, max_in_flight_per_conn: nil, tls_handshake_rate_limit: :unlimited, preload_static_dirs: nil) ⇒ Worker
Returns a new instance of Worker.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/hyperion/worker.rb', line 19 def initialize(host:, port:, app:, read_timeout:, tls: nil, thread_count: Server::DEFAULT_THREAD_COUNT, config: nil, worker_index: 0, listener: nil, max_pending: nil, max_request_read_seconds: 60, h2_settings: nil, async_io: nil, runtime: nil, accept_fibers_per_worker: 1, h2_max_total_streams: nil, admin_listener_port: nil, admin_listener_host: '127.0.0.1', admin_token: nil, tls_session_cache_size: TLS::DEFAULT_SESSION_CACHE_SIZE, tls_ticket_key_rotation_signal: :USR2, tls_ktls: :auto, io_uring: :off, max_in_flight_per_conn: nil, tls_handshake_rate_limit: :unlimited, preload_static_dirs: nil) @host = host @port = port @app = app @read_timeout = read_timeout @tls = tls @thread_count = thread_count @config = config || Hyperion::Config.new @worker_index = worker_index @listener = listener @max_pending = max_pending @max_request_read_seconds = max_request_read_seconds @h2_settings = h2_settings @async_io = async_io @runtime = runtime @accept_fibers_per_worker = accept_fibers_per_worker @h2_max_total_streams = h2_max_total_streams @admin_listener_port = admin_listener_port @admin_listener_host = admin_listener_host @admin_token = admin_token @tls_session_cache_size = tls_session_cache_size @tls_ticket_key_rotation_signal = tls_ticket_key_rotation_signal @tls_ktls = tls_ktls @io_uring = io_uring @max_in_flight_per_conn = max_in_flight_per_conn @tls_handshake_rate_limit = tls_handshake_rate_limit @preload_static_dirs = preload_static_dirs end |
Instance Method Details
#run ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/hyperion/worker.rb', line 62 def run scheme = @tls ? 'https' : 'http' Hyperion.logger.info do { message: 'worker listening', pid: Process.pid, worker_index: @worker_index, url: "#{scheme}://#{@host}:#{@port}" } end server = Server.new(host: @host, port: @port, app: @app, read_timeout: @read_timeout, tls: @tls, thread_count: @thread_count, max_pending: @max_pending, max_request_read_seconds: @max_request_read_seconds, h2_settings: @h2_settings, async_io: @async_io, runtime: @runtime, accept_fibers_per_worker: @accept_fibers_per_worker, h2_max_total_streams: @h2_max_total_streams, admin_listener_port: @admin_listener_port, admin_listener_host: @admin_listener_host, admin_token: @admin_token, tls_session_cache_size: @tls_session_cache_size, tls_ktls: @tls_ktls, io_uring: @io_uring, max_in_flight_per_conn: @max_in_flight_per_conn, tls_handshake_rate_limit: @tls_handshake_rate_limit, preload_static_dirs: @preload_static_dirs) # `on_worker_boot` runs in the child after fork, BEFORE the worker # adopts/binds its listener and before any accept. App code reconnects # DB/Redis pools here so each worker has its own. Index identifies the # slot (0..workers-1) so apps can shard background work if they want. # # Pre-1.6.3 this hook fired AFTER the listener was adopted (`:share`) # or freshly bound with SO_REUSEPORT (`:reuseport`). On `:reuseport` # that meant the kernel could queue inbound connections to the # worker's listen socket while the operator's hook was still warming # up DB pools — observable as first-request latency spikes against # an unready handler. Firing the hook before listener setup makes the # two worker models behave identically: no socket exists for this # worker until the boot hook has returned. @config.on_worker_boot.each { |h| h.call(@worker_index) } # 2.4-C: register the io_uring active gauge for this worker. # `Hyperion::IOUring.resolve_policy!` was already called by the # Server constructor; if it returned true, this worker is using # the io_uring accept path and the gauge value is 1. io_uring_active = @io_uring != :off && Hyperion::IOUring.resolve_policy!(@io_uring) ? 1 : 0 Hyperion.metrics.set_gauge( :hyperion_io_uring_workers_active, io_uring_active, [Process.pid.to_s] ) tcp_server = @listener || build_reuseport_listener server.adopt_listener(tcp_server) Signal.trap('TERM') { server.stop } Signal.trap('INT') { server.stop } install_tls_rotation_signal_handler(server) begin server.start ensure # 2.4-C: clear the per-worker io_uring gauge slot at shutdown # so post-shutdown scrapes don't claim the policy is still # active for this worker. Hyperion.metrics.set_gauge(:hyperion_io_uring_workers_active, 0, [Process.pid.to_s]) # `on_worker_shutdown` fires when the accept loop exits — either # due to graceful SIGTERM or a hard error. Use it to flush metrics, # close DB connections cleanly, etc. @config.on_worker_shutdown.each { |h| h.call(@worker_index) } end end |