Class: Hyperion::AsyncPg::FiberPool

Inherits:
Object
  • Object
show all
Defined in:
lib/hyperion/async_pg/fiber_pool.rb

Overview

Fiber-aware connection pool. The popular ‘connection_pool` gem uses a Mutex + ConditionVariable that don’t yield to the Async fiber scheduler — a fiber waiting for a connection blocks the whole OS thread and the async-pg shim’s win evaporates.

FiberPool wraps Async::Semaphore (which IS fiber-aware) around a plain Array. acquire = wait for a slot via the semaphore (fibers cooperate, OS thread keeps serving others). pop/push around the Array is atomic per-thread under GVL.

Usage:

pool = Hyperion::AsyncPg::FiberPool.new(size: 64) do
  PG.connect(ENV['DATABASE_URL'])
end

pool.with do |conn|
  conn.exec_params('SELECT * FROM users WHERE id = $1', [user_id])
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size:, parallel_fill_threads: 8, &factory) ⇒ FiberPool

Returns a new instance of FiberPool.

Raises:

  • (ArgumentError)


26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 26

def initialize(size:, parallel_fill_threads: 8, &factory)
  raise ArgumentError, 'size must be a positive Integer' unless size.is_a?(Integer) && size.positive?
  raise ArgumentError, 'block (factory) is required' unless factory

  @size      = size
  @factory   = factory
  @available = []
  @semaphore = ::Async::Semaphore.new(size)
  @parallel_fill_threads = parallel_fill_threads
  @mutex     = Mutex.new
  @filled    = false
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



39
40
41
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 39

def size
  @size
end

Instance Method Details

#closeObject

Close every connection in the pool. Safe to call from a worker shutdown hook so PG sockets get a clean FIN instead of being GC’d later. Calls ‘#close` on each connection if it responds.



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 74

def close
  @mutex.synchronize do
    while (conn = @available.pop)
      begin
        conn.close if conn.respond_to?(:close)
      rescue StandardError
        # conn already broken — keep closing the rest
      end
    end
    @filled = false
  end
  self
end

#fillObject

Open ‘size` connections (in parallel by default — over WAN with ~50-400 ms PG.connect latencies, sequential fill of large pools blows past server-bind windows). Idempotent: subsequent calls are no-ops. Returns self for chaining.



45
46
47
48
49
50
51
52
53
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 45

def fill
  @mutex.synchronize do
    return self if @filled

    @available.concat(parallel_fill(@size))
    @filled = true
  end
  self
end

#withObject

Check out a connection, run the block, return it. Yields the connection. If the pool isn’t filled yet, fills it lazily.



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/hyperion/async_pg/fiber_pool.rb', line 57

def with
  fill unless @filled
  @semaphore.acquire do
    conn = @available.pop
    raise 'FiberPool: empty pool — semaphore + Array invariant violated' if conn.nil?

    begin
      yield conn
    ensure
      @available.push(conn)
    end
  end
end