Class: PGMQ::Client

Inherits:
Object
  • Object
show all
Includes:
Autovacuum, Consumer, Maintenance, MessageLifecycle, Metrics, MultiQueue, Producer, QueueManagement, Topics, Transaction
Defined in:
lib/pgmq/client.rb,
lib/pgmq/client/topics.rb,
lib/pgmq/client/metrics.rb,
lib/pgmq/client/consumer.rb,
lib/pgmq/client/producer.rb,
lib/pgmq/client/autovacuum.rb,
lib/pgmq/client/maintenance.rb,
lib/pgmq/client/multi_queue.rb,
lib/pgmq/client/queue_management.rb,
lib/pgmq/client/message_lifecycle.rb

Overview

Low-level client for interacting with PGMQ (Postgres Message Queue)

This is a thin wrapper around PGMQ SQL functions. For higher-level abstractions (job processing, retries, Rails integration), use pgmq-framework.

Examples:

Basic usage

client = PGMQ::Client.new(
  host: 'localhost',
  dbname: 'mydb',
  user: 'postgres',
  password: 'postgres'
)
client.create('my_queue')
msg_id = client.produce('my_queue', '{"data":"value"}')
msg = client.read('my_queue', vt: 30)
client.delete('my_queue', msg.msg_id)

With Rails/ActiveRecord (reuses Rails connection pool)

client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })

With connection string

client = PGMQ::Client.new('postgres://localhost/mydb')

Defined Under Namespace

Modules: Autovacuum, Consumer, Maintenance, MessageLifecycle, Metrics, MultiQueue, Producer, QueueManagement, Topics

Constant Summary collapse

DEFAULT_VT =

Default visibility timeout in seconds

30

Constants included from Autovacuum

Autovacuum::DEFAULT_ARCHIVE_SETTINGS, Autovacuum::DEFAULT_QUEUE_SETTINGS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Autovacuum

#tune_autovacuum

Methods included from Topics

#bind_topic, #list_topic_bindings, #produce_batch_topic, #produce_topic, #test_routing, #unbind_topic, #validate_routing_key, #validate_topic_pattern

Methods included from Metrics

#metrics, #metrics_all

Methods included from Maintenance

#convert_archive_partitioned, #disable_notify_insert, #enable_notify_insert, #list_notify_insert_throttles, #purge_queue, #update_notify_insert, #wait_for_notify

Methods included from MessageLifecycle

#archive, #archive_batch, #archive_multi, #delete, #delete_batch, #delete_multi, #pop, #pop_batch, #set_vt, #set_vt_batch, #set_vt_multi

Methods included from MultiQueue

#pop_multi, #read_multi, #read_multi_with_poll

Methods included from Consumer

#read, #read_batch, #read_grouped, #read_grouped_head, #read_grouped_rr, #read_grouped_rr_with_poll, #read_grouped_with_poll, #read_with_poll

Methods included from Producer

#produce, #produce_batch

Methods included from QueueManagement

#create, #create_fifo_index, #create_fifo_indexes_all, #create_partitioned, #create_unlogged, #drop_queue, #list_queues

Methods included from Transaction

#transaction

Constructor Details

#initialize(conn_params = nil, pool_size: Connection::DEFAULT_POOL_SIZE, pool_timeout: Connection::DEFAULT_POOL_TIMEOUT, auto_reconnect: true) ⇒ Client

Creates a new PGMQ client

Examples:

Connection string

client = PGMQ::Client.new('postgres://user:pass@localhost/db')

Connection hash

client = PGMQ::Client.new(host: 'localhost', dbname: 'mydb', user: 'postgres')

Inject existing connection (for Rails)

client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })

Disable auto-reconnect

client = PGMQ::Client.new(auto_reconnect: false)

Parameters:

  • conn_params (String, Hash, Proc, PGMQ::Connection, nil) (defaults to: nil)

    connection parameters

  • pool_size (Integer) (defaults to: Connection::DEFAULT_POOL_SIZE)

    connection pool size

  • pool_timeout (Integer) (defaults to: Connection::DEFAULT_POOL_TIMEOUT)

    connection pool timeout in seconds

  • auto_reconnect (Boolean) (defaults to: true)

    automatically reconnect on connection errors (default: true)



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/pgmq/client.rb', line 63

def initialize(
  conn_params = nil,
  pool_size: Connection::DEFAULT_POOL_SIZE,
  pool_timeout: Connection::DEFAULT_POOL_TIMEOUT,
  auto_reconnect: true
)
  @connection = if conn_params.is_a?(Connection)
    conn_params
  else
    Connection.new(
      conn_params,
      pool_size: pool_size,
      pool_timeout: pool_timeout,
      auto_reconnect: auto_reconnect
    )
  end
end

Instance Attribute Details

#connectionPGMQ::Connection (readonly)

Returns the connection manager.

Returns:



43
44
45
# File 'lib/pgmq/client.rb', line 43

def connection
  @connection
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes all connections in the pool



83
84
85
# File 'lib/pgmq/client.rb', line 83

def close
  @connection.close
end

#statsHash

Returns connection pool statistics

Examples:

stats = client.stats
# => { size: 5, available: 3 }

Returns:

  • (Hash)

    statistics about the connection pool



93
94
95
# File 'lib/pgmq/client.rb', line 93

def stats
  @connection.stats
end

#with_connection {|PG::Connection| ... } ⇒ Object

Note:

You receive the raw PG::Connection. PGMQ performs no type mapping (results come back as strings) and does not wrap your statement in a transaction. Use Transaction#transaction when you need atomicity.

Note:

Do not retain or use the yielded connection outside the block. Once the block returns, the connection goes back to the pool and another thread may check it out; PG::Connection is not thread-safe, so using it after the block can corrupt libpq state (nil results, wrong data, segfaults).

Note:

The connection is returned to the pool without resetting session state. Anything session-scoped you create - LISTEN, SET, a session-level advisory lock (pg_advisory_lock), a prepared statement, a temp table - survives check-in and leaks to the next pool user. Undo it before the block exits (UNLISTEN, RESET, pg_advisory_unlock, etc.). For LISTEN/NOTIFY consumption, prefer PGMQ::Client::Maintenance#wait_for_notify, which manages LISTEN/UNLISTEN for you.

Checks out a pooled connection and yields the raw PG::Connection for arbitrary SQL.

All PGMQ operations run through this method internally, so it carries the same guarantees: a connection is taken from the pool, health-checked when auto_reconnect is enabled, and a single retry on a fresh connection is attempted if the checked-out connection turns out to be dead before any SQL is sent (see PGMQ::Connection#with_connection). The connection is returned to the pool when the block exits.

Exposed so callers can issue PostgreSQL statements PGMQ does not wrap without standing up a second pool: ad-hoc NOTIFY, LISTEN, advisory locks, custom monitoring queries, or DDL that lives alongside your queue tables. Reusing the PGMQ pool keeps connection accounting in one place.

Examples:

Fire a custom NOTIFY on the PGMQ pool

client.with_connection do |conn|
  conn.exec_params("SELECT pg_notify($1, $2)", ["my_channel", payload])
end

Run a query PGMQ does not wrap

depth = client.with_connection do |conn|
  conn.exec("SELECT count(*) FROM pgmq.q_orders")[0]["count"].to_i
end

Yields:

  • (PG::Connection)

    a pooled, health-checked database connection

Returns:

  • (Object)

    the return value of the block

Raises:



134
135
136
# File 'lib/pgmq/client.rb', line 134

def with_connection(&)
  @connection.with_connection(&)
end