Class: PGMQ::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pgmq/connection.rb

Overview

Manages database connections for PGMQ

Supports multiple connection strategies:

  • Connection strings

  • Hash of connection parameters

  • Callable objects (for Rails ActiveRecord integration)

Examples:

With connection string

conn = PGMQ::Connection.new("postgres://localhost/mydb")

With connection hash

conn = PGMQ::Connection.new(host: 'localhost', dbname: 'mydb')

With Rails ActiveRecord (reuses Rails connection pool)

conn = PGMQ::Connection.new(-> { ActiveRecord::Base.connection.raw_connection })

Constant Summary collapse

DEFAULT_POOL_SIZE =

Default connection pool size

5
DEFAULT_POOL_TIMEOUT =

Default connection pool timeout in seconds

5

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn_params, pool_size: DEFAULT_POOL_SIZE, pool_timeout: DEFAULT_POOL_TIMEOUT, auto_reconnect: true) ⇒ Connection

Creates a new connection manager

Parameters:

  • conn_params (String, Hash, Proc)

    connection parameters or callable

  • pool_size (Integer) (defaults to: DEFAULT_POOL_SIZE)

    size of the connection pool

  • pool_timeout (Integer) (defaults to: DEFAULT_POOL_TIMEOUT)

    connection pool timeout in seconds

  • auto_reconnect (Boolean) (defaults to: true)

    automatically reconnect on connection errors

Raises:



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/pgmq/connection.rb', line 130

def initialize(
  conn_params,
  pool_size: DEFAULT_POOL_SIZE,
  pool_timeout: DEFAULT_POOL_TIMEOUT,
  auto_reconnect: true
)
  if conn_params.nil?
    raise(
      PGMQ::Errors::ConfigurationError,
      "Connection parameters are required"
    )
  end

  @conn_params = normalize_connection_params(conn_params)
  @pool_size = pool_size
  @pool_timeout = pool_timeout
  @auto_reconnect = auto_reconnect
  @pool = create_pool
end

Class Attribute Details

.reconnectable_error_classesArray<Class>

Additional exception classes that mean the connection is dead. ‘PG::ConnectionBad` and `PG::UnableToSend` are always matched — this list is appended to them. Subclasses also match.

Thread-safe: reads are lock-free; writes should be done at boot time.

Examples:

PGMQ::Connection.reconnectable_error_classes << PG::ConnectionRefused

Returns:

  • (Array<Class>)


54
55
56
# File 'lib/pgmq/connection.rb', line 54

def reconnectable_error_classes
  @reconnectable_error_classes
end

.reconnectable_error_patternsArray<String, Regexp>

Additional error message patterns (String or Regexp) that mean the connection is dead and a retry on a fresh socket is safe. Strings are matched as case-insensitive substrings; Regexps match the original message. The built-in LOST_CONNECTION_MESSAGES are always checked first — this list is appended to them.

Thread-safe: reads are lock-free (frozen array swap); writes should be done at boot time before forking workers.

Examples:

PGMQ::Connection.reconnectable_error_patterns << "connection reset by peer"
PGMQ::Connection.reconnectable_error_patterns << /\Abroken pipe\b/i

Returns:

  • (Array<String, Regexp>)


43
44
45
# File 'lib/pgmq/connection.rb', line 43

def reconnectable_error_patterns
  @reconnectable_error_patterns
end

Instance Attribute Details

#poolConnectionPool (readonly)

Returns the connection pool.

Returns:

  • (ConnectionPool)

    the connection pool



121
122
123
# File 'lib/pgmq/connection.rb', line 121

def pool
  @pool
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes all connections in the pool



182
183
184
# File 'lib/pgmq/connection.rb', line 182

def close
  @pool.shutdown { |conn| conn.close unless conn.finished? }
end

#statsHash

Returns connection pool statistics

Examples:

stats = connection.stats
# => { size: 5, available: 3 }

Returns:

  • (Hash)

    statistics about the connection pool



192
193
194
195
196
197
# File 'lib/pgmq/connection.rb', line 192

def stats
  {
    size: @pool_size,
    available: @pool.available
  }
end

#with_connection {|PG::Connection| ... } ⇒ Object

Executes a block with a connection from the pool

Yields:

  • (PG::Connection)

    database connection

Returns:

  • (Object)

    result of the block

Raises:



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/pgmq/connection.rb', line 155

def with_connection
  retries = @auto_reconnect ? 1 : 0
  attempts = 0

  begin
    @pool.with do |conn|
      # Health check: verify connection is alive
      verify_connection!(conn) if @auto_reconnect

      yield conn
    end
  rescue PG::Error => e
    attempts += 1

    # If connection error and auto_reconnect enabled, try once more
    retry if attempts <= retries && connection_lost_error?(e)

    raise PGMQ::Errors::ConnectionError, "Database connection error: #{e.message}"
  rescue ConnectionPool::TimeoutError => e
    raise PGMQ::Errors::ConnectionError, "Connection pool timeout: #{e.message}"
  rescue ConnectionPool::PoolShuttingDownError => e
    raise PGMQ::Errors::ConnectionError, "Connection pool is closed: #{e.message}"
  end
end