Class: Pgtk::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/pgtk/pool.rb,
lib/pgtk/pool/txn.rb,
lib/pgtk/pool/busy.rb,
lib/pgtk/pool/iterable_queue.rb

Overview

Pool provides a connection pool for PostgreSQL database connections. It manages a fixed number of connections to optimize performance and resource usage while providing a simple interface for database operations.

The Pool class handles connection lifecycle, reconnects on errors, and provides transaction support. It's the core class for interacting with a PostgreSQL database in this library.

Basic usage:

# Create a wire that knows how to connect to PostgreSQL
wire = Pgtk::Wire::Direct.new(
host: 'localhost',
port: 5432,
dbname: 'mydatabase',
user: 'postgres',
password: 'secret'
)

# Create and start a connection pool with 4 connections
pool = Pgtk::Pool.new(wire, max: 4)
pool.start!

# Execute a simple query
pool.exec('SELECT * FROM users')

# Execute a parameterized query
pool.exec('SELECT * FROM users WHERE email = $1', ['user@example.com'])

# Use transactions for multiple operations
pool.transaction do |t|
t.exec('UPDATE accounts SET balance = balance - $1 WHERE id = $2', [100, 42])
t.exec('UPDATE accounts SET balance = balance + $1 WHERE id = $2', [100, 43])
end
Author

Yegor Bugayenko (yegor256@gmail.com)

Copyright

Copyright (c) 2019-2026 Yegor Bugayenko

License

MIT

Defined Under Namespace

Classes: Busy, IterableQueue, Txn

Instance Method Summary collapse

Constructor Details

#initialize(wire, max: 8, timeout: 1, idle: 60, log: Loog::NULL) ⇒ Pool

Constructor.

The idle option guards against the cold-slot SSL desync that bites managed PostgreSQL behind a TLS proxy: a slot sits idle long enough for the proxy and the client to disagree about SSL state, libpq still reports CONNECTION_OK, and the next real query blows up with a decryption error. When a slot has been idle longer than idle seconds, the pool runs SELECT 1 on it before yielding; if that fails, the slot is renewed in-line and the caller never sees the error. Set to nil to skip validation entirely (e.g. for local Unix-socket PostgreSQL).

Parameters:

  • wire (Pgtk::Wire)

    The wire

  • max (Integer) (defaults to: 8)

    Total amount of PostgreSQL connections in the pool

  • timeout (Numeric) (defaults to: 1)

    Max seconds to wait for a free connection

  • idle (Numeric, nil) (defaults to: 60)

    Seconds of idleness after which to validate a connection on checkout, or nil to disable validation

  • log (Object) (defaults to: Loog::NULL)

    The log



70
71
72
73
74
75
76
77
78
# File 'lib/pgtk/pool.rb', line 70

def initialize(wire, max: 8, timeout: 1, idle: 60, log: Loog::NULL)
  @wire = wire
  @max = max
  @idle = idle
  @log = log
  @pool = IterableQueue.new(max, timeout)
  @lock = Mutex.new
  @started = false
end

Instance Method Details

#dumpString

Get as much details about it as possible.

Returns:

  • (String)

    Summary of inner state



95
96
97
98
99
100
101
102
103
# File 'lib/pgtk/pool.rb', line 95

def dump
  [
    'Pgtk::Pool',
    "  Pgtk version: #{Pgtk::VERSION}",
    "  PgSQL version: #{version}",
    "  #{@pool.size} connections:",
    @pool.map { |conn| info(conn) }
  ].flatten.join("\n")
end

#exec(query, args = [], result = 0) {|Hash| ... } ⇒ Object

Make a query and return the result as an array of hashes. For example, in order to fetch the list of all books belonging to the user:

books = pool.exec('SELECT * FROM book WHERE owner = $1', ['yegor256']) books.each do |row| puts 'ID: ' + row.to_i puts 'Created: ' + Time.parse(row) puts 'Title: ' + row end

All values in the retrieved hash are strings. No matter what types of data you have in the database, you get strings here. It's your job to convert them to the type you need.

In order to insert a new row (pay attention to the RETURNING clause at the end of the SQL query):

id = pool.exec( 'INSERT INTO book (owner, title) VALUES ($1, $2) RETURNING id', ['yegor256', 'Elegant Objects'] )[0]['id'].to_i

You can also pass a block to this method, if you want to get an instance of PG::Result instead of an array of hashes:

pool.exec('SELECT * FROM book WHERE owner = $1', ['yegor256']) do |res| res.each do |row| puts 'ID: ' + row.to_i puts 'Title: ' + row end end

When the query is too long it's convenient to use an array to specify it:

pool.exec( [ 'SELECT * FROM book', 'LEFT JOIN user ON user.id = book.owner', 'WHERE user.login = $1 AND book.title = $2' ], ['yegor256', 'Elegant Objects'] )

More details about exec_params, which is called here, you can find here: https://www.rubydoc.info/gems/pg/0.17.1/PG%2FConnection:exec_params

Parameters:

  • query (String)

    The SQL query with params inside (possibly)

  • args (Array) (defaults to: [])

    List of arguments

  • result (Integer) (defaults to: 0)

    Should be 0 for text results, 1 for binary

Yields:

  • (Hash)

    Rows



180
181
182
183
184
185
186
187
188
189
# File 'lib/pgtk/pool.rb', line 180

def exec(query, args = [], result = 0, &block)
  connect do |c|
    t = Txn.new(c, @log)
    if block_given?
      t.exec(query, args, result, &block)
    else
      t.exec(query, args, result)
    end
  end
end

#start!Object

Start it with a fixed number of connections. The amount of connections is specified in max argument and should be big enough to handle the amount of parallel connections you may have to the database. However, keep in mind that not all servers will allow you to have many connections open at the same time. For example, Heroku free PostgreSQL database allows only one connection open.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/pgtk/pool.rb', line 111

def start!
  @lock.synchronize do
    return if @started
    @max.times do
      @pool.push(@wire.connection)
    end
    (2 * @max).times do
      connect { |c| c.exec('SELECT 1') }
    rescue StandardError => e
      @log.warn("Pool warm-up query failed, slot will be retried: #{e.message.strip}")
    end
    @max.times do
      connect { |c| c.exec('SELECT 1') }
    end
    @started = true
    @log.debug("PostgreSQL pool started with #{@max} connections")
  end
end

#transactionObject

Run a transaction. The block has to be provided. It will receive a temporary object, which implements method exec, which works exactly like the method exec of class Pool, for example:

pgsql.transaction do |t| t.exec('DELETE FROM user WHERE id = $1', [id]) t.exec('INSERT INTO user (name) VALUES ($1)', [name]) end



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/pgtk/pool.rb', line 199

def transaction
  connect do |c|
    t = Txn.new(c, @log)
    t.exec('START TRANSACTION')
    begin
      yield(t).tap { t.exec('COMMIT') }
    ensure
      if c.transaction_status != PG::Constants::PQTRANS_IDLE
        begin
          t.exec('ROLLBACK')
        rescue StandardError => e
          @log.warn("Failed to rollback transaction: #{e.message}")
        end
      end
    end
  end
end

#versionString

Get the version of PostgreSQL server.

Returns:

  • (String)

    Version of PostgreSQL server



83
84
85
86
87
88
89
90
# File 'lib/pgtk/pool.rb', line 83

def version
  @version ||=
    begin
      conn = @pool.pop
      @pool.push(conn)
      conn.parameter_status('server_version').split[0]
    end
end