Module: Hyperion::AsyncPg

Defined in:
lib/hyperion/async_pg.rb,
lib/hyperion/async_pg/version.rb,
lib/hyperion/async_pg/fiber_pool.rb

Overview

Companion gem for Hyperion HTTP server: makes pg’s Connection#exec_params (and friends) yield to the Async fiber scheduler while waiting on the Postgres TCP socket. Outside a scheduler (no current fiber, or a fiber not driven by an Async-compatible scheduler), behaviour is identical to plain pg — IO#wait_readable falls back to its blocking implementation.

Usage:

require 'hyperion/async_pg'
Hyperion::AsyncPg.install!

After install, every subsequent PG::Connection#exec_params / exec / query call routes through the async-aware variant. ActiveRecord, Sequel, ROM, and direct ‘pg` callers all benefit transparently.

Defined Under Namespace

Classes: Error, FiberPool, Patches

Constant Summary collapse

DEFAULT_METHODS =

Methods on PG::Connection that we redirect through the async dispatch loop. Each maps to a non-blocking send_* counterpart inside the C extension; see SEND_METHODS below.

%i[
  exec
  exec_params
  exec_prepared
  query
  async_exec
  async_exec_params
].freeze
SEND_METHODS =
{
  exec: :send_query,
  query: :send_query,
  async_exec: :send_query,
  exec_params: :send_query_params,
  async_exec_params: :send_query_params,
  exec_prepared: :send_query_prepared
}.freeze
VERSION =
'0.3.0'

Class Method Summary collapse

Class Method Details

.dispatch(conn, send_method, args, kwargs, &block) ⇒ Object

Internal entry point used by Patches. Public so the prepended module can call it without exposing the patched method to send().



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/hyperion/async_pg.rb', line 83

def dispatch(conn, send_method, args, kwargs, &block)
  if kwargs.empty?
    conn.public_send(send_method, *args)
  else
    conn.public_send(send_method, *args, **kwargs)
  end
  wait_until_ready(conn)
  result = drain_results(conn)
  result&.check
  return yield result if block

  result
end

.drain_results(conn) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/hyperion/async_pg.rb', line 111

def drain_results(conn)
  last = nil
  while (r = conn.get_result)
    last&.clear
    last = r
  end
  last
end

.install!(methods: DEFAULT_METHODS) ⇒ Object



50
51
52
53
54
55
56
57
58
# File 'lib/hyperion/async_pg.rb', line 50

def install!(methods: DEFAULT_METHODS)
  @install_mutex.synchronize do
    return false if @installed

    ::PG::Connection.prepend(Patches.new(methods))
    @installed = true
  end
  true
end

.installed?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/hyperion/async_pg.rb', line 60

def installed?
  @installed
end

.read_timeoutObject

Read timeout for a single wait_readable call. nil = block forever (matches pg’s default; rely on server-side statement_timeout to cap runaway queries). Tunable via env so operators don’t need a redeploy.



74
75
76
77
78
79
# File 'lib/hyperion/async_pg.rb', line 74

def read_timeout
  v = ENV['HYPERION_ASYNC_PG_READ_TIMEOUT']
  return nil if v.nil? || v.empty?

  v.to_f
end

.reset_for_tests!Object

Resets internal install flag. Test-only — does NOT remove the prepended module from PG::Connection’s ancestor chain (Ruby has no public API for un-prepending). Useful for verifying the idempotency guard in specs.



67
68
69
# File 'lib/hyperion/async_pg.rb', line 67

def reset_for_tests!
  @install_mutex.synchronize { @installed = false }
end

.wait_until_ready(conn) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/hyperion/async_pg.rb', line 97

def wait_until_ready(conn)
  timeout = read_timeout
  loop do
    conn.consume_input
    break unless conn.is_busy

    # IO#wait_readable cooperates with Fiber.scheduler when one is
    # current (Async sets it on every Async::Task). Without a scheduler
    # this blocks the OS thread, identical to pg's normal behaviour.
    ready = conn.socket_io.wait_readable(timeout)
    raise ::PG::ConnectionBad, 'pg socket wait_readable timed out' if ready.nil?
  end
end