Module: PGMQ::Client::QueueManagement
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/queue_management.rb
Overview
Queue management operations (create, drop, list)
This module handles all queue lifecycle operations including creating queues (standard, partitioned, unlogged), dropping queues, and listing existing queues.
Instance Method Summary collapse
-
#create(queue_name, tune_autovacuum: false) ⇒ Boolean
Creates a new queue.
-
#create_fifo_index(queue_name) ⇒ void
Creates the FIFO index on a queue’s table required for grouped reads.
-
#create_fifo_indexes_all ⇒ void
Creates FIFO indexes on all existing queues.
-
#create_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", tune_autovacuum: false) ⇒ Boolean
Creates a partitioned queue.
-
#create_unlogged(queue_name, tune_autovacuum: false) ⇒ Boolean
Creates an unlogged queue for higher throughput (no crash recovery).
-
#drop_queue(queue_name) ⇒ Boolean
Drops a queue and its archive table.
-
#list_queues ⇒ Array<PGMQ::QueueMetadata>
Lists all queues.
Instance Method Details
#create(queue_name, tune_autovacuum: false) ⇒ Boolean
Creates a new queue
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/pgmq/client/queue_management.rb', line 26 def create(queue_name, tune_autovacuum: false) validate_queue_name!(queue_name) with_connection do |conn| existed = queue_exists?(conn, queue_name) conn.exec_params("SELECT pgmq.create($1::text)", [queue_name]) apply_tune_autovacuum_option(conn, queue_name, tune_autovacuum) !existed end end |
#create_fifo_index(queue_name) ⇒ void
This method returns an undefined value.
Creates the FIFO index on a queue’s table required for grouped reads
Grouped read operations (‘read_grouped`, `read_grouped_rr`, `read_grouped_head`) rely on this index for correct ordering and acceptable query performance. Without it, grouped reads will work but may be slow or return incorrect ordering at scale. The operation is idempotent - calling it on a queue that already has the index is safe.
128 129 130 131 132 133 134 135 136 |
# File 'lib/pgmq/client/queue_management.rb', line 128 def create_fifo_index(queue_name) validate_queue_name!(queue_name) with_connection do |conn| conn.exec_params("SELECT pgmq.create_fifo_index($1::text)", [queue_name]) end nil end |
#create_fifo_indexes_all ⇒ void
This method returns an undefined value.
Creates FIFO indexes on all existing queues
Convenience wrapper that calls ‘create_fifo_index` for every queue registered in `pgmq.meta`. Useful for one-time migrations when adding grouped reads to an existing deployment. The operation is idempotent.
148 149 150 151 152 153 154 |
# File 'lib/pgmq/client/queue_management.rb', line 148 def create_fifo_indexes_all with_connection do |conn| conn.exec("SELECT pgmq.create_fifo_indexes_all()") end nil end |
#create_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", tune_autovacuum: false) ⇒ Boolean
Creates a partitioned queue
Requires pg_partman extension to be installed
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pgmq/client/queue_management.rb', line 54 def create_partitioned( queue_name, partition_interval: "10000", retention_interval: "100000", tune_autovacuum: false ) validate_queue_name!(queue_name) with_connection do |conn| existed = queue_exists?(conn, queue_name) conn.exec_params( "SELECT pgmq.create_partitioned($1::text, $2::text, $3::text)", [queue_name, partition_interval, retention_interval] ) apply_tune_autovacuum_option(conn, queue_name, tune_autovacuum) !existed end end |
#create_unlogged(queue_name, tune_autovacuum: false) ⇒ Boolean
Creates an unlogged queue for higher throughput (no crash recovery)
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/pgmq/client/queue_management.rb', line 83 def create_unlogged(queue_name, tune_autovacuum: false) validate_queue_name!(queue_name) with_connection do |conn| existed = queue_exists?(conn, queue_name) conn.exec_params("SELECT pgmq.create_unlogged($1::text)", [queue_name]) apply_tune_autovacuum_option(conn, queue_name, tune_autovacuum) !existed end end |
#drop_queue(queue_name) ⇒ Boolean
Drops a queue and its archive table
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/pgmq/client/queue_management.rb', line 101 def drop_queue(queue_name) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params("SELECT pgmq.drop_queue($1::text)", [queue_name]) end return false if result.ntuples.zero? result[0]["drop_queue"] == "t" end |
#list_queues ⇒ Array<PGMQ::QueueMetadata>
Lists all queues
163 164 165 166 167 168 169 |
# File 'lib/pgmq/client/queue_management.rb', line 163 def list_queues result = with_connection do |conn| conn.exec("SELECT * FROM pgmq.list_queues()") end result.map { |row| QueueMetadata.new(row) } end |