Module: Hyperion::AsyncPg::ForkSafe
- Defined in:
- lib/hyperion/async_pg/fork_safe.rb
Overview
Auto-invalidates registered pools across fork boundaries so users don’t have to wire ‘on_worker_boot` (Hyperion / Puma cluster mode) themselves. Hooks `Process._fork` (Ruby 3.1+) to forget any connections held by the parent — the child will recreate them on first use.
Background: pre-fork servers (Hyperion ‘-w N`, Puma cluster, Falcon multi-worker) load the rackup in the master process and then fork children. Any `PG::Connection` opened in the master is shared with every child via inherited file descriptors. Concurrent reads/writes on the same fd interleave bytes and corrupt the wire protocol —symptom is `PG::UnableToSend: another command is already in progress` on every request, ~99.99 % 5xx under load. The classic workaround is “open the pool in `on_worker_boot`”, which requires a separate config file. ForkSafe lets you keep the pool open at the rackup level and have it transparently re-init in each child.
Usage:
require 'hyperion/async_pg/fork_safe'
Hyperion::AsyncPg::ForkSafe.install!
$pg_pool = Hyperion::AsyncPg::ForkSafe.register(
Hyperion::AsyncPg::FiberPool.new(size: 64) do
PG.connect(ENV['DATABASE_URL'])
end
)
$pg_pool.fill # master: opens 64 conns. Child: refills lazily on first .with.
Or via the kitchen-sink one-liner on the main shim:
Hyperion::AsyncPg.install!(activerecord: true, fork_safe: true)
Registered pools must respond to ‘#reset_after_fork`. The shipped `Hyperion::AsyncPg::FiberPool` implements it as a metadata-only reset (drops parent’s connection refs without calling ‘#close` on them — those file descriptors are now in use by the child kernel- side, and closing them in the parent would yank them out from under the child too).
Defined Under Namespace
Modules: Hook Classes: IncompatiblePoolError
Class Method Summary collapse
-
.__reset_for_specs__ ⇒ Object
Test-only: clear pool registry + flip the install flag so the next ‘install!` is a fresh idempotency test.
-
.install! ⇒ Object
Install the ‘Process._fork` hook.
- .installed? ⇒ Boolean
-
.register(pool) ⇒ Object
Register a pool to be reset on fork.
-
.reset_all_pools_in_child! ⇒ Object
Reset all registered pools — called from the fork hook in the child process AFTER ‘fork(2)` returns 0.
Class Method Details
.__reset_for_specs__ ⇒ Object
Test-only: clear pool registry + flip the install flag so the next ‘install!` is a fresh idempotency test. We do NOT and CANNOT un-prepend `Hook` from `Process.singleton_class` —Ruby has no public API for that. Once prepended (by the first spec that calls `install!`), `Hook` stays in the ancestor chain for the rest of the suite. That’s safe because ‘Hook#_fork` only calls `reset_all_pools_in_child!`, and we clear the pool registry between examples — so the leftover hook + empty registry is a no-op.
140 141 142 143 |
# File 'lib/hyperion/async_pg/fork_safe.rb', line 140 def __reset_for_specs__ @pools_mutex.synchronize { @pools.clear } @install_mutex.synchronize { @installed = false } end |
.install! ⇒ Object
Install the ‘Process._fork` hook. Idempotent + thread-safe. Returns `true` if this call wired the hook, `false` if the hook was already installed (or this Ruby lacks `Process._fork`).
No-op on Rubies older than 3.1 (warns once on stderr) — without ‘Process._fork` there’s no reliable, library-friendly way to detect the fork boundary; users on those Rubies must use ‘on_worker_boot` from their server config instead.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/hyperion/async_pg/fork_safe.rb', line 77 def install! installed_now = false @install_mutex.synchronize do return false if @installed unless ::Process.respond_to?(:_fork) warn '[hyperion-async-pg] ForkSafe: Process._fork unavailable on this Ruby — fork detection disabled' return false end install_hook! @installed = true installed_now = true end installed_now end |
.installed? ⇒ Boolean
93 94 95 |
# File 'lib/hyperion/async_pg/fork_safe.rb', line 93 def installed? @installed end |
.register(pool) ⇒ Object
Register a pool to be reset on fork. Returns the pool unchanged for chaining: ‘$pg_pool = ForkSafe.register(FiberPool.new(…))`.
The pool must respond to ‘#reset_after_fork`. Anything else is rejected up-front to surface integration mistakes at boot rather than after a child fork.
103 104 105 106 107 108 109 110 |
# File 'lib/hyperion/async_pg/fork_safe.rb', line 103 def register(pool) unless pool.respond_to?(:reset_after_fork) raise IncompatiblePoolError, "pool must respond to #reset_after_fork (got #{pool.class})" end @pools_mutex.synchronize { @pools << pool } pool end |
.reset_all_pools_in_child! ⇒ Object
Reset all registered pools — called from the fork hook in the child process AFTER ‘fork(2)` returns 0. Forgets parent’s connection refs (without closing them — child’s fds, parent owns the OS-level closing).
One bad pool’s ‘#reset_after_fork` raising must NOT prevent the rest from being reset; otherwise a single buggy pool can poison every other pool in the child and you’re back to the fd-sharing corruption this whole module exists to prevent.
121 122 123 124 125 126 127 128 129 |
# File 'lib/hyperion/async_pg/fork_safe.rb', line 121 def reset_all_pools_in_child! @pools_mutex.synchronize do @pools.each do |pool| pool.reset_after_fork rescue StandardError => e warn "[hyperion-async-pg] ForkSafe: pool reset raised: #{e.class}: #{e.}" end end end |