Module: Hyperion::AsyncPg

Defined in:
lib/hyperion/async_pg.rb,
lib/hyperion/async_pg/version.rb,
lib/hyperion/async_pg/fork_safe.rb,
lib/hyperion/async_pg/fiber_pool.rb,
lib/hyperion/async_pg/active_record_adapter.rb

Overview

Companion gem for Hyperion HTTP server: makes pg’s Connection#exec_params (and friends) yield to the Async fiber scheduler while waiting on the Postgres TCP socket. Outside a scheduler (no current fiber, or a fiber not driven by an Async-compatible scheduler), behaviour is identical to plain pg — IO#wait_readable falls back to its blocking implementation.

Usage:

require 'hyperion/async_pg'
Hyperion::AsyncPg.install!

After install, every subsequent PG::Connection#exec_params / exec / query call routes through the async-aware variant. ActiveRecord, Sequel, ROM, and direct ‘pg` callers all benefit transparently.

Defined Under Namespace

Modules: ActiveRecordAdapter, ForkSafe Classes: Error, FiberPool, Patches

Constant Summary collapse

DEFAULT_METHODS =

Methods on PG::Connection that we redirect through the async dispatch loop. Each maps to a non-blocking send_* counterpart inside the C extension; see SEND_METHODS below.

%i[
  exec
  exec_params
  exec_prepared
  query
  async_exec
  async_exec_params
].freeze
SEND_METHODS =
{
  exec: :send_query,
  query: :send_query,
  async_exec: :send_query,
  exec_params: :send_query_params,
  async_exec_params: :send_query_params,
  exec_prepared: :send_query_prepared
}.freeze
VERSION =
'0.5.1'

Class Method Summary collapse

Class Method Details

.dispatch(conn, send_method, args, kwargs, &block) ⇒ Object

Internal entry point used by Patches. Public so the prepended module can call it without exposing the patched method to send().



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/hyperion/async_pg.rb', line 109

def dispatch(conn, send_method, args, kwargs, &block)
  if kwargs.empty?
    conn.public_send(send_method, *args)
  else
    conn.public_send(send_method, *args, **kwargs)
  end
  wait_until_ready(conn)
  result = drain_results(conn)
  result&.check
  return yield result if block

  result
end

.drain_results(conn) ⇒ Object



137
138
139
140
141
142
143
144
# File 'lib/hyperion/async_pg.rb', line 137

def drain_results(conn)
  last = nil
  while (r = conn.get_result)
    last&.clear
    last = r
  end
  last
end

.install!(methods: DEFAULT_METHODS, activerecord: false, fork_safe: false) ⇒ Object

Boot-time install. Idempotent + thread-safe.

Options:

methods:        list of PG::Connection methods to patch (default: DEFAULT_METHODS).
activerecord:   when true, also flips ActiveSupport::IsolatedExecutionState
                to :fiber so AR's connection pool keys on the running
                fiber instead of the OS thread. Required for AR + fiber
                schedulers; see ActiveRecordAdapter for details.
fork_safe:      when true, also wires Process._fork (Ruby 3.1+) so any
                pool registered with ForkSafe.register is reset in
                each child process — eliminates the pre-fork PG fd
                sharing trap on Hyperion `-w N` / Puma cluster mode
                without forcing users to write `on_worker_boot`.

Backward-compatible: prior callers used ‘install!` with no args. The PG patch is idempotent — first call wins, subsequent calls return false without re-prepending. The `activerecord:` and `fork_safe:` installs are independent and idempotent in their own right (each has its own internal guard), so they run on every call where the kwarg is true.



72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/hyperion/async_pg.rb', line 72

def install!(methods: DEFAULT_METHODS, activerecord: false, fork_safe: false)
  first_install = false
  @install_mutex.synchronize do
    unless @installed
      ::PG::Connection.prepend(Patches.new(methods))
      @installed = true
      first_install = true
    end
  end
  ActiveRecordAdapter.install! if activerecord
  ForkSafe.install! if fork_safe
  first_install
end

.installed?Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/hyperion/async_pg.rb', line 86

def installed?
  @installed
end

.read_timeoutObject

Read timeout for a single wait_readable call. nil = block forever (matches pg’s default; rely on server-side statement_timeout to cap runaway queries). Tunable via env so operators don’t need a redeploy.



100
101
102
103
104
105
# File 'lib/hyperion/async_pg.rb', line 100

def read_timeout
  v = ENV['HYPERION_ASYNC_PG_READ_TIMEOUT']
  return nil if v.nil? || v.empty?

  v.to_f
end

.reset_for_tests!Object

Resets internal install flag. Test-only — does NOT remove the prepended module from PG::Connection’s ancestor chain (Ruby has no public API for un-prepending). Useful for verifying the idempotency guard in specs.



93
94
95
# File 'lib/hyperion/async_pg.rb', line 93

def reset_for_tests!
  @install_mutex.synchronize { @installed = false }
end

.wait_until_ready(conn) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/hyperion/async_pg.rb', line 123

def wait_until_ready(conn)
  timeout = read_timeout
  loop do
    conn.consume_input
    break unless conn.is_busy

    # IO#wait_readable cooperates with Fiber.scheduler when one is
    # current (Async sets it on every Async::Task). Without a scheduler
    # this blocks the OS thread, identical to pg's normal behaviour.
    ready = conn.socket_io.wait_readable(timeout)
    raise ::PG::ConnectionBad, 'pg socket wait_readable timed out' if ready.nil?
  end
end