Module: PGMQ::Client::Maintenance
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/maintenance.rb
Overview
Queue maintenance operations
This module handles queue maintenance tasks such as purging messages and detaching archive tables.
Instance Method Summary collapse
-
#convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10) ⇒ void
Converts a standard queue’s archive table to a pg_partman-managed partitioned table.
-
#disable_notify_insert(queue_name) ⇒ void
Disables PostgreSQL NOTIFY for a queue.
-
#enable_notify_insert(queue_name, throttle_interval_ms: 250) ⇒ void
Enables PostgreSQL NOTIFY when messages are inserted into a queue.
-
#list_notify_insert_throttles ⇒ Array<PGMQ::NotifyThrottle>
Lists all queues that have a NOTIFY trigger enabled, with their throttle configuration.
-
#purge_queue(queue_name) ⇒ Integer
Purges all messages from a queue.
-
#update_notify_insert(queue_name, throttle_interval_ms:) ⇒ void
Updates the throttle interval for an already-enabled NOTIFY trigger.
-
#wait_for_notify(queue_name, timeout: nil) ⇒ String?
Blocks until a PostgreSQL NOTIFY arrives on the queue’s channel or the timeout expires.
Instance Method Details
#convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10) ⇒ void
Requires the ‘pg_partman` PostgreSQL extension. If pg_partman is not installed and the archive table exists, the call raises `PGMQ::Errors::ConnectionError`. If the archive table does not exist the call succeeds (returns nil) without touching pg_partman, so no extension is needed in that case.
This method returns an undefined value.
Converts a standard queue’s archive table to a pg_partman-managed partitioned table
Provides a migration path for queues originally created with ‘create` or `create_unlogged` whose archive tables have grown large enough to benefit from partitioning. The queue message table is not affected - only the archive table (`pgmq.a_<queue_name>`) is converted.
The operation renames the existing archive table to ‘pgmq.a_<queue_name>_old`, creates a new partitioned table with the same schema, and hands it over to pg_partman for lifecycle management. **Existing archived rows are left in the `_old` table and must be migrated manually** if visibility in the new partitioned archive is needed. If the archive table is already partitioned the function returns without error (idempotent). If the archive table does not exist it also returns without error.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/pgmq/client/maintenance.rb', line 90 def convert_archive_partitioned( queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10 ) validate_queue_name!(queue_name) with_connection do |conn| conn.exec_params( "SELECT pgmq.convert_archive_partitioned($1::text, $2::text, $3::text, $4::integer)", [queue_name, partition_interval, retention_interval, leading_partition] ) end nil end |
#disable_notify_insert(queue_name) ⇒ void
This method returns an undefined value.
Disables PostgreSQL NOTIFY for a queue
163 164 165 166 167 168 169 170 171 |
# File 'lib/pgmq/client/maintenance.rb', line 163 def disable_notify_insert(queue_name) validate_queue_name!(queue_name) with_connection do |conn| conn.exec_params("SELECT pgmq.disable_notify_insert($1::text)", [queue_name]) end nil end |
#enable_notify_insert(queue_name, throttle_interval_ms: 250) ⇒ void
This method returns an undefined value.
Enables PostgreSQL NOTIFY when messages are inserted into a queue
When enabled, PostgreSQL will send a NOTIFY event on message insert, allowing clients to use LISTEN instead of polling. The throttle interval prevents notification storms during high-volume inserts.
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/pgmq/client/maintenance.rb', line 44 def enable_notify_insert(queue_name, throttle_interval_ms: 250) validate_queue_name!(queue_name) with_connection do |conn| conn.exec_params( "SELECT pgmq.enable_notify_insert($1::text, $2::integer)", [queue_name, throttle_interval_ms] ) end nil end |
#list_notify_insert_throttles ⇒ Array<PGMQ::NotifyThrottle>
Lists all queues that have a NOTIFY trigger enabled, with their throttle configuration
Returns one NotifyThrottle per queue that has had #enable_notify_insert called on it. Useful for auditing notification configuration across all queues at once.
120 121 122 123 124 125 126 |
# File 'lib/pgmq/client/maintenance.rb', line 120 def list_notify_insert_throttles result = with_connection do |conn| conn.exec("SELECT * FROM pgmq.list_notify_insert_throttles()") end result.map { |row| PGMQ::NotifyThrottle.new(row) } end |
#purge_queue(queue_name) ⇒ Integer
Purges all messages from a queue
17 18 19 20 21 22 23 24 25 |
# File 'lib/pgmq/client/maintenance.rb', line 17 def purge_queue(queue_name) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params("SELECT pgmq.purge_queue($1::text)", [queue_name]) end result[0]["purge_queue"] end |
#update_notify_insert(queue_name, throttle_interval_ms:) ⇒ void
This method returns an undefined value.
Updates the throttle interval for an already-enabled NOTIFY trigger
Changes how frequently PostgreSQL is allowed to fire a NOTIFY event on message insert, without having to disable and re-enable the trigger. The queue must already have notifications enabled via #enable_notify_insert.
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/pgmq/client/maintenance.rb', line 143 def update_notify_insert(queue_name, throttle_interval_ms:) validate_queue_name!(queue_name) with_connection do |conn| conn.exec_params( "SELECT pgmq.update_notify_insert($1::text, $2::integer)", [queue_name, throttle_interval_ms] ) end nil end |
#wait_for_notify(queue_name, timeout: nil) ⇒ String?
Orchestration (retry loop, reconnect-on-drop, graceful shutdown) is the caller’s responsibility. This method is a thin primitive — it listens once, waits, and returns.
Blocks until a PostgreSQL NOTIFY arrives on the queue’s channel or the timeout expires.
PGMQ sends notifications on the channel ‘pgmq.q_<queue_name>.INSERT` whenever a message is inserted (requires #enable_notify_insert to be called first). This method issues `LISTEN`, waits for a notification via the `pg` gem’s ‘wait_for_notify`, then issues `UNLISTEN` before returning the connection to the pool.
Compared with Consumer#read_with_poll, which holds the connection inside a PL/pgSQL loop for the full poll window, ‘wait_for_notify` releases the connection the moment a notification arrives (or the timeout expires). This makes it more efficient under low message rates where the poll window would otherwise burn idle time.
The optional block receives ‘channel`, `backend_pid`, and `payload` when a notification arrives. The return value mirrors `PG::Connection#wait_for_notify`: the channel name string on success, or `nil` on timeout.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/pgmq/client/maintenance.rb', line 208 def wait_for_notify(queue_name, timeout: nil) validate_queue_name!(queue_name) # PGMQ trigger fires pg_notify('pgmq.q_<queue>.INSERT', NULL) channel = "pgmq.q_#{queue_name}.INSERT" with_connection do |conn| conn.exec("LISTEN \"#{channel}\"") begin conn.wait_for_notify(timeout) do |ch, pid, payload| yield ch, pid, payload if block_given? end ensure conn.exec("UNLISTEN \"#{channel}\"") end end end |