Class: Hyperion::ThreadPool
- Inherits:
-
Object
- Object
- Hyperion::ThreadPool
- Defined in:
- lib/hyperion/thread_pool.rb
Overview
Thread pool for Rack dispatch. Has two modes:
-
‘#submit_connection(socket, app)` — HTTP/1.1 path. The whole socket is handed to a worker thread, which runs `Connection#serve(socket, app)` directly with `thread_pool: nil` (the worker IS the pool). Zero per-request hop, one OS thread per in-flight connection — Puma’s model.
-
‘#call(app, request)` — old hop-based API. Used by Http2Handler, where each h2 stream runs on a fiber inside the connection fiber and DOES need the cross-thread hop for `app.call(env)`.
Why we need this: synchronous Rack handlers (Rails dev-mode reloader, ActiveRecord, many gems) hold global mutexes that serialize work across fibers on a single thread. Fibers give us cheap connection counts but cannot deliver true parallelism for blocking handlers. The thread pool gives us Puma-style OS-thread concurrency for ‘app.call(env)` while the accept loop stays on fibers.
Cross-thread fiber wakeup (for the legacy ‘#call` path): on Ruby 3.2+ with the Async fiber scheduler, `Queue#pop` is fiber-aware — the fiber yields cooperatively while waiting on the queue. Verified experimentally on Ruby 3.3.3.
Constant Summary collapse
- SHUTDOWN =
:__hyperion_thread_pool_shutdown__
Instance Attribute Summary collapse
-
#max_pending ⇒ Object
readonly
Returns the value of attribute max_pending.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#call(app, request) ⇒ Object
HTTP/2 + sub-call path: hop one ‘app.call` from the calling fiber to a worker thread.
-
#initialize(size:, max_pending: nil) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #shutdown ⇒ Object
-
#submit_connection(socket, app, max_request_read_seconds: 60) ⇒ Object
HTTP/1.1 path: hand the whole socket to a worker thread.
Constructor Details
#initialize(size:, max_pending: nil) ⇒ ThreadPool
Returns a new instance of ThreadPool.
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/hyperion/thread_pool.rb', line 31 def initialize(size:, max_pending: nil) @size = size @max_pending = max_pending @inbox = Queue.new # multiplexes both kinds of jobs # Pre-allocate one reply queue per in-flight slot for the legacy `#call` # path. Bounded by `size`: if all workers are busy, all reply queues are # checked out, and the next caller blocks on `@reply_pool.pop` until a # worker frees one. That's the correct backpressure shape. @reply_pool = Queue.new size.times { @reply_pool << Queue.new } @workers = Array.new(size) { spawn_worker } end |
Instance Attribute Details
#max_pending ⇒ Object (readonly)
Returns the value of attribute max_pending.
29 30 31 |
# File 'lib/hyperion/thread_pool.rb', line 29 def max_pending @max_pending end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
29 30 31 |
# File 'lib/hyperion/thread_pool.rb', line 29 def size @size end |
Instance Method Details
#call(app, request) ⇒ Object
HTTP/2 + sub-call path: hop one ‘app.call` from the calling fiber to a worker thread. The fiber yields until the worker pushes the result back.
Reply-queue lifecycle invariant: ‘@reply_pool` always contains queues that are empty. We check one out, hand it to the worker, the worker pushes exactly one result, we pop it, then return the queue to the pool. If `app.call` raises, the worker still pushes a 500 result — see `spawn_worker`.
74 75 76 77 78 79 80 |
# File 'lib/hyperion/thread_pool.rb', line 74 def call(app, request) reply = @reply_pool.pop @inbox << [:call, app, request, reply] result = reply.pop @reply_pool << reply result end |
#shutdown ⇒ Object
82 83 84 85 |
# File 'lib/hyperion/thread_pool.rb', line 82 def shutdown @size.times { @inbox << SHUTDOWN } @workers.each { |t| t.join(5) } end |
#submit_connection(socket, app, max_request_read_seconds: 60) ⇒ Object
HTTP/1.1 path: hand the whole socket to a worker thread. The worker runs ‘Connection#serve(socket, app)` directly. No per-request hop. Returns immediately — caller does not wait.
Returns true on enqueue, false on rejection. When ‘max_pending` is set and the inbox already has at least that many entries, the connection is rejected up to the caller (Server emits a 503 and closes the socket). Without `max_pending` (default nil) the queue is unbounded and we always return true — preserves pre-1.2 behaviour.
The check is inherently racy with worker drain — workers may pop between our ‘size` read and the `<<`. Backpressure is statistical, not strict. Off-by-one over the configured cap during a thundering accept burst is acceptable; the cost of stricter sync would be a mutex on every enqueue, which we won’t pay on the hot path.
59 60 61 62 63 64 |
# File 'lib/hyperion/thread_pool.rb', line 59 def submit_connection(socket, app, max_request_read_seconds: 60) return false if @max_pending && @inbox.size >= @max_pending @inbox << [:connection, socket, app, max_request_read_seconds] true end |