Class: Hyperion::AsyncPg::FiberPool
- Inherits:
-
Object
- Object
- Hyperion::AsyncPg::FiberPool
- Defined in:
- lib/hyperion/async_pg/fiber_pool.rb
Overview
Fiber-aware connection pool. The popular ‘connection_pool` gem uses a Mutex + ConditionVariable that don’t yield to the Async fiber scheduler — a fiber waiting for a connection blocks the whole OS thread and the async-pg shim’s win evaporates.
FiberPool wraps Async::Semaphore (which IS fiber-aware) around a plain Array. acquire = wait for a slot via the semaphore (fibers cooperate, OS thread keeps serving others). pop/push around the Array is atomic per-thread under GVL.
Usage:
pool = Hyperion::AsyncPg::FiberPool.new(size: 64) do
PG.connect(ENV['DATABASE_URL'])
end
pool.with do |conn|
conn.exec_params('SELECT * FROM users WHERE id = $1', [user_id])
end
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#close ⇒ Object
Close every connection in the pool.
-
#fill ⇒ Object
Open ‘size` connections (in parallel by default — over WAN with ~50-400 ms PG.connect latencies, sequential fill of large pools blows past server-bind windows).
-
#initialize(size:, parallel_fill_threads: 8, &factory) ⇒ FiberPool
constructor
A new instance of FiberPool.
-
#reset_after_fork ⇒ Object
Drop the parent process’s connection refs after fork(2).
-
#with ⇒ Object
Check out a connection, run the block, return it.
Constructor Details
#initialize(size:, parallel_fill_threads: 8, &factory) ⇒ FiberPool
Returns a new instance of FiberPool.
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 26 def initialize(size:, parallel_fill_threads: 8, &factory) raise ArgumentError, 'size must be a positive Integer' unless size.is_a?(Integer) && size.positive? raise ArgumentError, 'block (factory) is required' unless factory @size = size @factory = factory @available = [] @semaphore = ::Async::Semaphore.new(size) @parallel_fill_threads = parallel_fill_threads @mutex = Mutex.new @filled = false end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
39 40 41 |
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 39 def size @size end |
Instance Method Details
#close ⇒ Object
Close every connection in the pool. Safe to call from a worker shutdown hook so PG sockets get a clean FIN instead of being GC’d later. Calls ‘#close` on each connection if it responds.
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 74 def close @mutex.synchronize do while (conn = @available.pop) begin conn.close if conn.respond_to?(:close) rescue StandardError # conn already broken — keep closing the rest end end @filled = false end self end |
#fill ⇒ Object
Open ‘size` connections (in parallel by default — over WAN with ~50-400 ms PG.connect latencies, sequential fill of large pools blows past server-bind windows). Idempotent: subsequent calls are no-ops. Returns self for chaining.
45 46 47 48 49 50 51 52 53 |
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 45 def fill @mutex.synchronize do return self if @filled @available.concat(parallel_fill(@size)) @filled = true end self end |
#reset_after_fork ⇒ Object
Drop the parent process’s connection refs after fork(2). The child process must NOT close these — the file descriptors are inherited but the parent owns the orderly close. We just forget about them so the next #with triggers a fresh factory.call() on the child’s first use.
Called automatically when ForkSafe.install! is active. Safe to call manually for hand-managed pool lifecycles.
96 97 98 99 100 101 102 |
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 96 def reset_after_fork @mutex.synchronize do @available.clear @filled = false end self end |
#with ⇒ Object
Check out a connection, run the block, return it. Yields the connection. If the pool isn’t filled yet, fills it lazily.
57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 57 def with fill unless @filled @semaphore.acquire do conn = @available.pop raise 'FiberPool: empty pool — semaphore + Array invariant violated' if conn.nil? begin yield conn ensure @available.push(conn) end end end |