Module: PGMQ::Client::QueueManagement
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/queue_management.rb
Overview
Queue management operations (create, drop, list)
This module handles all queue lifecycle operations including creating queues (standard, partitioned, unlogged), dropping queues, and listing existing queues.
Instance Method Summary collapse
-
#create(queue_name) ⇒ Boolean
Creates a new queue.
-
#create_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000") ⇒ Boolean
Creates a partitioned queue.
-
#create_unlogged(queue_name) ⇒ Boolean
Creates an unlogged queue for higher throughput (no crash recovery).
-
#drop_queue(queue_name) ⇒ Boolean
Drops a queue and its archive table.
-
#list_queues ⇒ Array<PGMQ::QueueMetadata>
Lists all queues.
Instance Method Details
#create(queue_name) ⇒ Boolean
Creates a new queue
20 21 22 23 24 25 26 27 28 |
# File 'lib/pgmq/client/queue_management.rb', line 20 def create(queue_name) validate_queue_name!(queue_name) with_connection do |conn| existed = queue_exists?(conn, queue_name) conn.exec_params("SELECT pgmq.create($1::text)", [queue_name]) !existed end end |
#create_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000") ⇒ Boolean
Creates a partitioned queue
Requires pg_partman extension to be installed
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/pgmq/client/queue_management.rb', line 44 def create_partitioned( queue_name, partition_interval: "10000", retention_interval: "100000" ) validate_queue_name!(queue_name) with_connection do |conn| existed = queue_exists?(conn, queue_name) conn.exec_params( "SELECT pgmq.create_partitioned($1::text, $2::text, $3::text)", [queue_name, partition_interval, retention_interval] ) !existed end end |
#create_unlogged(queue_name) ⇒ Boolean
Creates an unlogged queue for higher throughput (no crash recovery)
68 69 70 71 72 73 74 75 76 |
# File 'lib/pgmq/client/queue_management.rb', line 68 def create_unlogged(queue_name) validate_queue_name!(queue_name) with_connection do |conn| existed = queue_exists?(conn, queue_name) conn.exec_params("SELECT pgmq.create_unlogged($1::text)", [queue_name]) !existed end end |
#drop_queue(queue_name) ⇒ Boolean
Drops a queue and its archive table
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/pgmq/client/queue_management.rb', line 85 def drop_queue(queue_name) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params("SELECT pgmq.drop_queue($1::text)", [queue_name]) end return false if result.ntuples.zero? result[0]["drop_queue"] == "t" end |
#list_queues ⇒ Array<PGMQ::QueueMetadata>
Lists all queues
104 105 106 107 108 109 110 |
# File 'lib/pgmq/client/queue_management.rb', line 104 def list_queues result = with_connection do |conn| conn.exec("SELECT * FROM pgmq.list_queues()") end result.map { |row| QueueMetadata.new(row) } end |