Module: Hyperion::AsyncPg
- Defined in:
- lib/hyperion/async_pg.rb,
lib/hyperion/async_pg/version.rb,
lib/hyperion/async_pg/fork_safe.rb,
lib/hyperion/async_pg/fiber_pool.rb,
lib/hyperion/async_pg/active_record_adapter.rb
Overview
Companion gem for Hyperion HTTP server: makes pg’s Connection#exec_params (and friends) yield to the Async fiber scheduler while waiting on the Postgres TCP socket. Outside a scheduler (no current fiber, or a fiber not driven by an Async-compatible scheduler), behaviour is identical to plain pg — IO#wait_readable falls back to its blocking implementation.
Usage:
require 'hyperion/async_pg'
Hyperion::AsyncPg.install!
After install, every subsequent PG::Connection#exec_params / exec / query call routes through the async-aware variant. ActiveRecord, Sequel, ROM, and direct ‘pg` callers all benefit transparently.
Defined Under Namespace
Modules: ActiveRecordAdapter, ForkSafe Classes: Error, FiberPool, Patches
Constant Summary collapse
- DEFAULT_METHODS =
Methods on PG::Connection that we redirect through the async dispatch loop. Each maps to a non-blocking send_* counterpart inside the C extension; see SEND_METHODS below.
%i[ exec exec_params exec_prepared query async_exec async_exec_params ].freeze
- SEND_METHODS =
{ exec: :send_query, query: :send_query, async_exec: :send_query, exec_params: :send_query_params, async_exec_params: :send_query_params, exec_prepared: :send_query_prepared }.freeze
- VERSION =
'0.5.1'
Class Method Summary collapse
-
.dispatch(conn, send_method, args, kwargs, &block) ⇒ Object
Internal entry point used by Patches.
- .drain_results(conn) ⇒ Object
-
.install!(methods: DEFAULT_METHODS, activerecord: false, fork_safe: false) ⇒ Object
Boot-time install.
- .installed? ⇒ Boolean
-
.read_timeout ⇒ Object
Read timeout for a single wait_readable call.
-
.reset_for_tests! ⇒ Object
Resets internal install flag.
- .wait_until_ready(conn) ⇒ Object
Class Method Details
.dispatch(conn, send_method, args, kwargs, &block) ⇒ Object
Internal entry point used by Patches. Public so the prepended module can call it without exposing the patched method to send().
109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/hyperion/async_pg.rb', line 109 def dispatch(conn, send_method, args, kwargs, &block) if kwargs.empty? conn.public_send(send_method, *args) else conn.public_send(send_method, *args, **kwargs) end wait_until_ready(conn) result = drain_results(conn) result&.check return yield result if block result end |
.drain_results(conn) ⇒ Object
137 138 139 140 141 142 143 144 |
# File 'lib/hyperion/async_pg.rb', line 137 def drain_results(conn) last = nil while (r = conn.get_result) last&.clear last = r end last end |
.install!(methods: DEFAULT_METHODS, activerecord: false, fork_safe: false) ⇒ Object
Boot-time install. Idempotent + thread-safe.
Options:
methods: list of PG::Connection methods to patch (default: DEFAULT_METHODS).
activerecord: when true, also flips ActiveSupport::IsolatedExecutionState
to :fiber so AR's connection pool keys on the running
fiber instead of the OS thread. Required for AR + fiber
schedulers; see ActiveRecordAdapter for details.
fork_safe: when true, also wires Process._fork (Ruby 3.1+) so any
pool registered with ForkSafe.register is reset in
each child process — eliminates the pre-fork PG fd
sharing trap on Hyperion `-w N` / Puma cluster mode
without forcing users to write `on_worker_boot`.
Backward-compatible: prior callers used ‘install!` with no args. The PG patch is idempotent — first call wins, subsequent calls return false without re-prepending. The `activerecord:` and `fork_safe:` installs are independent and idempotent in their own right (each has its own internal guard), so they run on every call where the kwarg is true.
72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/hyperion/async_pg.rb', line 72 def install!(methods: DEFAULT_METHODS, activerecord: false, fork_safe: false) first_install = false @install_mutex.synchronize do unless @installed ::PG::Connection.prepend(Patches.new(methods)) @installed = true first_install = true end end ActiveRecordAdapter.install! if activerecord ForkSafe.install! if fork_safe first_install end |
.installed? ⇒ Boolean
86 87 88 |
# File 'lib/hyperion/async_pg.rb', line 86 def installed? @installed end |
.read_timeout ⇒ Object
Read timeout for a single wait_readable call. nil = block forever (matches pg’s default; rely on server-side statement_timeout to cap runaway queries). Tunable via env so operators don’t need a redeploy.
100 101 102 103 104 105 |
# File 'lib/hyperion/async_pg.rb', line 100 def read_timeout v = ENV['HYPERION_ASYNC_PG_READ_TIMEOUT'] return nil if v.nil? || v.empty? v.to_f end |
.reset_for_tests! ⇒ Object
Resets internal install flag. Test-only — does NOT remove the prepended module from PG::Connection’s ancestor chain (Ruby has no public API for un-prepending). Useful for verifying the idempotency guard in specs.
93 94 95 |
# File 'lib/hyperion/async_pg.rb', line 93 def reset_for_tests! @install_mutex.synchronize { @installed = false } end |
.wait_until_ready(conn) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/hyperion/async_pg.rb', line 123 def wait_until_ready(conn) timeout = read_timeout loop do conn.consume_input break unless conn.is_busy # IO#wait_readable cooperates with Fiber.scheduler when one is # current (Async sets it on every Async::Task). Without a scheduler # this blocks the OS thread, identical to pg's normal behaviour. ready = conn.socket_io.wait_readable(timeout) raise ::PG::ConnectionBad, 'pg socket wait_readable timed out' if ready.nil? end end |