Class: PGMQ::Client

Inherits:
Object
  • Object
show all
Includes:
Consumer, Maintenance, MessageLifecycle, Metrics, MultiQueue, Producer, QueueManagement, Topics, Transaction
Defined in:
lib/pgmq/client.rb,
lib/pgmq/client/topics.rb,
lib/pgmq/client/metrics.rb,
lib/pgmq/client/consumer.rb,
lib/pgmq/client/producer.rb,
lib/pgmq/client/maintenance.rb,
lib/pgmq/client/multi_queue.rb,
lib/pgmq/client/queue_management.rb,
lib/pgmq/client/message_lifecycle.rb

Overview

Low-level client for interacting with PGMQ (Postgres Message Queue)

This is a thin wrapper around PGMQ SQL functions. For higher-level abstractions (job processing, retries, Rails integration), use pgmq-framework.

Examples:

Basic usage

client = PGMQ::Client.new(
  host: 'localhost',
  dbname: 'mydb',
  user: 'postgres',
  password: 'postgres'
)
client.create('my_queue')
msg_id = client.produce('my_queue', '{"data":"value"}')
msg = client.read('my_queue', vt: 30)
client.delete('my_queue', msg.msg_id)

With Rails/ActiveRecord (reuses Rails connection pool)

client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })

With connection string

client = PGMQ::Client.new('postgres://localhost/mydb')

Defined Under Namespace

Modules: Consumer, Maintenance, MessageLifecycle, Metrics, MultiQueue, Producer, QueueManagement, Topics

Constant Summary collapse

DEFAULT_VT =

Default visibility timeout in seconds

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Topics

#bind_topic, #list_topic_bindings, #produce_batch_topic, #produce_topic, #test_routing, #unbind_topic, #validate_routing_key, #validate_topic_pattern

Methods included from Metrics

#metrics, #metrics_all

Methods included from Maintenance

#disable_notify_insert, #enable_notify_insert, #purge_queue

Methods included from MessageLifecycle

#archive, #archive_batch, #archive_multi, #delete, #delete_batch, #delete_multi, #pop, #pop_batch, #set_vt, #set_vt_batch, #set_vt_multi

Methods included from MultiQueue

#pop_multi, #read_multi, #read_multi_with_poll

Methods included from Consumer

#read, #read_batch, #read_grouped_rr, #read_grouped_rr_with_poll, #read_with_poll

Methods included from Producer

#produce, #produce_batch

Methods included from QueueManagement

#create, #create_partitioned, #create_unlogged, #drop_queue, #list_queues

Methods included from Transaction

#transaction

Constructor Details

#initialize(conn_params = nil, pool_size: Connection::DEFAULT_POOL_SIZE, pool_timeout: Connection::DEFAULT_POOL_TIMEOUT, auto_reconnect: true) ⇒ Client

Creates a new PGMQ client

Examples:

Connection string

client = PGMQ::Client.new('postgres://user:pass@localhost/db')

Connection hash

client = PGMQ::Client.new(host: 'localhost', dbname: 'mydb', user: 'postgres')

Inject existing connection (for Rails)

client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })

Disable auto-reconnect

client = PGMQ::Client.new(auto_reconnect: false)

Parameters:

  • conn_params (String, Hash, Proc, PGMQ::Connection, nil) (defaults to: nil)

    connection parameters

  • pool_size (Integer) (defaults to: Connection::DEFAULT_POOL_SIZE)

    connection pool size

  • pool_timeout (Integer) (defaults to: Connection::DEFAULT_POOL_TIMEOUT)

    connection pool timeout in seconds

  • auto_reconnect (Boolean) (defaults to: true)

    automatically reconnect on connection errors (default: true)



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pgmq/client.rb', line 62

def initialize(
  conn_params = nil,
  pool_size: Connection::DEFAULT_POOL_SIZE,
  pool_timeout: Connection::DEFAULT_POOL_TIMEOUT,
  auto_reconnect: true
)
  @connection = if conn_params.is_a?(Connection)
    conn_params
  else
    Connection.new(
      conn_params,
      pool_size: pool_size,
      pool_timeout: pool_timeout,
      auto_reconnect: auto_reconnect
    )
  end
end

Instance Attribute Details

#connectionPGMQ::Connection (readonly)

Returns the connection manager.

Returns:



42
43
44
# File 'lib/pgmq/client.rb', line 42

def connection
  @connection
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes all connections in the pool



82
83
84
# File 'lib/pgmq/client.rb', line 82

def close
  @connection.close
end

#statsHash

Returns connection pool statistics

Examples:

stats = client.stats
# => { size: 5, available: 3 }

Returns:

  • (Hash)

    statistics about the connection pool



92
93
94
# File 'lib/pgmq/client.rb', line 92

def stats
  @connection.stats
end