Class: Pgtk::Pool
- Inherits:
-
Object
- Object
- Pgtk::Pool
- Defined in:
- lib/pgtk/pool.rb,
lib/pgtk/pool/txn.rb,
lib/pgtk/pool/busy.rb,
lib/pgtk/pool/iterable_queue.rb
Overview
Pool provides a connection pool for PostgreSQL database connections. It manages a fixed number of connections to optimize performance and resource usage while providing a simple interface for database operations.
The Pool class handles connection lifecycle, reconnects on errors, and provides transaction support. It's the core class for interacting with a PostgreSQL database in this library.
Basic usage:
# Create a wire that knows how to connect to PostgreSQL
wire = Pgtk::Wire::Direct.new(
host: 'localhost',
port: 5432,
dbname: 'mydatabase',
user: 'postgres',
password: 'secret'
)
# Create and start a connection pool with 4 connections
pool = Pgtk::Pool.new(wire, max: 4)
pool.start!
# Execute a simple query
pool.exec('SELECT * FROM users')
# Execute a parameterized query
pool.exec('SELECT * FROM users WHERE email = $1', ['user@example.com'])
# Use transactions for multiple operations
pool.transaction do |t|
t.exec('UPDATE accounts SET balance = balance - $1 WHERE id = $2', [100, 42])
t.exec('UPDATE accounts SET balance = balance + $1 WHERE id = $2', [100, 43])
end
- Author
Yegor Bugayenko (yegor256@gmail.com)
- Copyright
Copyright (c) 2019-2026 Yegor Bugayenko
- License
MIT
Defined Under Namespace
Classes: Busy, IterableQueue, Txn
Instance Method Summary collapse
-
#dump ⇒ String
Get as much details about it as possible.
-
#exec(query, args = [], result = 0) {|Hash| ... } ⇒ Object
Make a query and return the result as an array of hashes.
-
#initialize(wire, max: 8, timeout: 1, idle: 60, log: Loog::NULL) ⇒ Pool
constructor
Constructor.
-
#start! ⇒ Object
Start it with a fixed number of connections.
-
#transaction ⇒ Object
Run a transaction.
-
#version ⇒ String
Get the version of PostgreSQL server.
Constructor Details
#initialize(wire, max: 8, timeout: 1, idle: 60, log: Loog::NULL) ⇒ Pool
Constructor.
The idle option guards against the cold-slot SSL desync that bites
managed PostgreSQL behind a TLS proxy: a slot sits idle long enough for
the proxy and the client to disagree about SSL state, libpq still reports
CONNECTION_OK, and the next real query blows up with a decryption error.
When a slot has been idle longer than idle seconds, the pool runs
SELECT 1 on it before yielding; if that fails, the slot is renewed
in-line and the caller never sees the error. Set to nil to skip
validation entirely (e.g. for local Unix-socket PostgreSQL).
70 71 72 73 74 75 76 77 78 |
# File 'lib/pgtk/pool.rb', line 70 def initialize(wire, max: 8, timeout: 1, idle: 60, log: Loog::NULL) @wire = wire @max = max @idle = idle @log = log @pool = IterableQueue.new(max, timeout) @lock = Mutex.new @started = false end |
Instance Method Details
#dump ⇒ String
Get as much details about it as possible.
95 96 97 98 99 100 101 102 103 |
# File 'lib/pgtk/pool.rb', line 95 def dump [ 'Pgtk::Pool', " Pgtk version: #{Pgtk::VERSION}", " PgSQL version: #{version}", " #{@pool.size} connections:", @pool.map { |conn| info(conn) } ].flatten.join("\n") end |
#exec(query, args = [], result = 0) {|Hash| ... } ⇒ Object
Make a query and return the result as an array of hashes. For example, in order to fetch the list of all books belonging to the user:
books = pool.exec('SELECT * FROM book WHERE owner = $1', ['yegor256']) books.each do |row| puts 'ID: ' + row.to_i puts 'Created: ' + Time.parse(row) puts 'Title: ' + row end
All values in the retrieved hash are strings. No matter what types of data you have in the database, you get strings here. It's your job to convert them to the type you need.
In order to insert a new row (pay attention to the RETURNING clause
at the end of the SQL query):
id = pool.exec( 'INSERT INTO book (owner, title) VALUES ($1, $2) RETURNING id', ['yegor256', 'Elegant Objects'] )[0]['id'].to_i
You can also pass a block to this method, if you want to get an instance
of PG::Result instead of an array of hashes:
pool.exec('SELECT * FROM book WHERE owner = $1', ['yegor256']) do |res| res.each do |row| puts 'ID: ' + row.to_i puts 'Title: ' + row end end
When the query is too long it's convenient to use an array to specify it:
pool.exec( [ 'SELECT * FROM book', 'LEFT JOIN user ON user.id = book.owner', 'WHERE user.login = $1 AND book.title = $2' ], ['yegor256', 'Elegant Objects'] )
More details about exec_params, which is called here, you can find
here: https://www.rubydoc.info/gems/pg/0.17.1/PG%2FConnection:exec_params
180 181 182 183 184 185 186 187 188 189 |
# File 'lib/pgtk/pool.rb', line 180 def exec(query, args = [], result = 0, &block) connect do |c| t = Txn.new(c, @log) if block_given? t.exec(query, args, result, &block) else t.exec(query, args, result) end end end |
#start! ⇒ Object
Start it with a fixed number of connections. The amount of connections
is specified in max argument and should be big enough to handle
the amount of parallel connections you may have to the database. However,
keep in mind that not all servers will allow you to have many connections
open at the same time. For example, Heroku free PostgreSQL database
allows only one connection open.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/pgtk/pool.rb', line 111 def start! @lock.synchronize do return if @started @max.times do @pool.push(@wire.connection) end (2 * @max).times do connect { |c| c.exec('SELECT 1') } rescue StandardError => e @log.warn("Pool warm-up query failed, slot will be retried: #{e..strip}") end @max.times do connect { |c| c.exec('SELECT 1') } end @started = true @log.debug("PostgreSQL pool started with #{@max} connections") end end |
#transaction ⇒ Object
Run a transaction. The block has to be provided. It will receive
a temporary object, which implements method exec, which works
exactly like the method exec of class Pool, for example:
pgsql.transaction do |t| t.exec('DELETE FROM user WHERE id = $1', [id]) t.exec('INSERT INTO user (name) VALUES ($1)', [name]) end
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/pgtk/pool.rb', line 199 def transaction connect do |c| t = Txn.new(c, @log) t.exec('START TRANSACTION') begin yield(t).tap { t.exec('COMMIT') } ensure if c.transaction_status != PG::Constants::PQTRANS_IDLE begin t.exec('ROLLBACK') rescue StandardError => e @log.warn("Failed to rollback transaction: #{e.}") end end end end end |
#version ⇒ String
Get the version of PostgreSQL server.
83 84 85 86 87 88 89 90 |
# File 'lib/pgtk/pool.rb', line 83 def version @version ||= begin conn = @pool.pop @pool.push(conn) conn.parameter_status('server_version').split[0] end end |