Module: Hyperion::AsyncPg
- Defined in:
- lib/hyperion/async_pg.rb,
lib/hyperion/async_pg/version.rb
Overview
Companion gem for Hyperion HTTP server: makes pg’s Connection#exec_params (and friends) yield to the Async fiber scheduler while waiting on the Postgres TCP socket. Outside a scheduler (no current fiber, or a fiber not driven by an Async-compatible scheduler), behaviour is identical to plain pg — IO#wait_readable falls back to its blocking implementation.
Usage:
require 'hyperion/async_pg'
Hyperion::AsyncPg.install!
After install, every subsequent PG::Connection#exec_params / exec / query call routes through the async-aware variant. ActiveRecord, Sequel, ROM, and direct ‘pg` callers all benefit transparently.
Defined Under Namespace
Constant Summary collapse
- DEFAULT_METHODS =
Methods on PG::Connection that we redirect through the async dispatch loop. Each maps to a non-blocking send_* counterpart inside the C extension; see SEND_METHODS below.
%i[ exec exec_params exec_prepared query async_exec async_exec_params ].freeze
- SEND_METHODS =
{ exec: :send_query, query: :send_query, async_exec: :send_query, exec_params: :send_query_params, async_exec_params: :send_query_params, exec_prepared: :send_query_prepared }.freeze
- VERSION =
'0.1.1'
Class Method Summary collapse
-
.dispatch(conn, send_method, args, kwargs, &block) ⇒ Object
Internal entry point used by Patches.
- .drain_results(conn) ⇒ Object
- .install!(methods: DEFAULT_METHODS) ⇒ Object
- .installed? ⇒ Boolean
-
.read_timeout ⇒ Object
Read timeout for a single wait_readable call.
-
.reset_for_tests! ⇒ Object
Resets internal install flag.
- .wait_until_ready(conn) ⇒ Object
Class Method Details
.dispatch(conn, send_method, args, kwargs, &block) ⇒ Object
Internal entry point used by Patches. Public so the prepended module can call it without exposing the patched method to send().
82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/hyperion/async_pg.rb', line 82 def dispatch(conn, send_method, args, kwargs, &block) if kwargs.empty? conn.public_send(send_method, *args) else conn.public_send(send_method, *args, **kwargs) end wait_until_ready(conn) result = drain_results(conn) result&.check return yield result if block result end |
.drain_results(conn) ⇒ Object
110 111 112 113 114 115 116 117 |
# File 'lib/hyperion/async_pg.rb', line 110 def drain_results(conn) last = nil while (r = conn.get_result) last&.clear last = r end last end |
.install!(methods: DEFAULT_METHODS) ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/hyperion/async_pg.rb', line 49 def install!(methods: DEFAULT_METHODS) @install_mutex.synchronize do return false if @installed ::PG::Connection.prepend(Patches.new(methods)) @installed = true end true end |
.installed? ⇒ Boolean
59 60 61 |
# File 'lib/hyperion/async_pg.rb', line 59 def installed? @installed end |
.read_timeout ⇒ Object
Read timeout for a single wait_readable call. nil = block forever (matches pg’s default; rely on server-side statement_timeout to cap runaway queries). Tunable via env so operators don’t need a redeploy.
73 74 75 76 77 78 |
# File 'lib/hyperion/async_pg.rb', line 73 def read_timeout v = ENV['HYPERION_ASYNC_PG_READ_TIMEOUT'] return nil if v.nil? || v.empty? v.to_f end |
.reset_for_tests! ⇒ Object
Resets internal install flag. Test-only — does NOT remove the prepended module from PG::Connection’s ancestor chain (Ruby has no public API for un-prepending). Useful for verifying the idempotency guard in specs.
66 67 68 |
# File 'lib/hyperion/async_pg.rb', line 66 def reset_for_tests! @install_mutex.synchronize { @installed = false } end |
.wait_until_ready(conn) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/hyperion/async_pg.rb', line 96 def wait_until_ready(conn) timeout = read_timeout loop do conn.consume_input break unless conn.is_busy # IO#wait_readable cooperates with Fiber.scheduler when one is # current (Async sets it on every Async::Task). Without a scheduler # this blocks the OS thread, identical to pg's normal behaviour. ready = conn.socket_io.wait_readable(timeout) raise ::PG::ConnectionBad, 'pg socket wait_readable timed out' if ready.nil? end end |