Class: PGMQ::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/pgmq/connection.rb

Overview

Manages database connections for PGMQ

Supports multiple connection strategies:

  • Connection strings

  • Hash of connection parameters

  • Callable objects (for Rails ActiveRecord integration)

Examples:

With connection string

conn = PGMQ::Connection.new("postgres://localhost/mydb")

With connection hash

conn = PGMQ::Connection.new(host: 'localhost', dbname: 'mydb')

With Rails ActiveRecord (reuses Rails connection pool)

conn = PGMQ::Connection.new(-> { ActiveRecord::Base.connection.raw_connection })

Constant Summary collapse

DEFAULT_POOL_SIZE =

Default connection pool size

5
DEFAULT_POOL_TIMEOUT =

Default connection pool timeout in seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn_params, pool_size: DEFAULT_POOL_SIZE, pool_timeout: DEFAULT_POOL_TIMEOUT, auto_reconnect: true) ⇒ Connection

Creates a new connection manager

Parameters:

  • conn_params (String, Hash, Proc)

    connection parameters or callable

  • pool_size (Integer) (defaults to: DEFAULT_POOL_SIZE)

    size of the connection pool

  • pool_timeout (Integer) (defaults to: DEFAULT_POOL_TIMEOUT)

    connection pool timeout in seconds

  • auto_reconnect (Boolean) (defaults to: true)

    automatically reconnect on connection errors

Raises:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/pgmq/connection.rb', line 39

def initialize(
  conn_params,
  pool_size: DEFAULT_POOL_SIZE,
  pool_timeout: DEFAULT_POOL_TIMEOUT,
  auto_reconnect: true
)
  if conn_params.nil?
    raise(
      PGMQ::Errors::ConfigurationError,
      "Connection parameters are required"
    )
  end

  @conn_params = normalize_connection_params(conn_params)
  @pool_size = pool_size
  @pool_timeout = pool_timeout
  @auto_reconnect = auto_reconnect
  @pool = create_pool
end

Instance Attribute Details

#poolConnectionPool (readonly)

Returns the connection pool.

Returns:

  • (ConnectionPool)

    the connection pool



30
31
32
# File 'lib/pgmq/connection.rb', line 30

def pool
  @pool
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes all connections in the pool



91
92
93
# File 'lib/pgmq/connection.rb', line 91

def close
  @pool.shutdown { |conn| conn.close unless conn.finished? }
end

#statsHash

Returns connection pool statistics

Examples:

stats = connection.stats
# => { size: 5, available: 3 }

Returns:

  • (Hash)

    statistics about the connection pool



101
102
103
104
105
106
# File 'lib/pgmq/connection.rb', line 101

def stats
  {
    size: @pool_size,
    available: @pool.available
  }
end

#with_connection {|PG::Connection| ... } ⇒ Object

Executes a block with a connection from the pool

Yields:

  • (PG::Connection)

    database connection

Returns:

  • (Object)

    result of the block

Raises:



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pgmq/connection.rb', line 64

def with_connection
  retries = @auto_reconnect ? 1 : 0
  attempts = 0

  begin
    @pool.with do |conn|
      # Health check: verify connection is alive
      verify_connection!(conn) if @auto_reconnect

      yield conn
    end
  rescue PG::Error => e
    attempts += 1

    # If connection error and auto_reconnect enabled, try once more
    retry if attempts <= retries && connection_lost_error?(e)

    raise PGMQ::Errors::ConnectionError, "Database connection error: #{e.message}"
  rescue ConnectionPool::TimeoutError => e
    raise PGMQ::Errors::ConnectionError, "Connection pool timeout: #{e.message}"
  rescue ConnectionPool::PoolShuttingDownError => e
    raise PGMQ::Errors::ConnectionError, "Connection pool is closed: #{e.message}"
  end
end