Module: PGMQ::Client::QueueManagement

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/queue_management.rb

Overview

Queue management operations (create, drop, list)

This module handles all queue lifecycle operations including creating queues (standard, partitioned, unlogged), dropping queues, and listing existing queues.

Instance Method Summary collapse

Instance Method Details

#create(queue_name, tune_autovacuum: false) ⇒ Boolean

Creates a new queue

Examples:

client.create("orders")  # => true (created)
client.create("orders")  # => false (already exists)

Create with tuned autovacuum

client.create("orders", tune_autovacuum: true)

Parameters:

  • queue_name (String)

    name of the queue to create

  • tune_autovacuum (Boolean, Hash) (defaults to: false)

    when truthy, tune autovacuum on the new queue’s tables after creation. Pass true for PGMQ-tuned defaults, or a Hash of options forwarded to Autovacuum#tune_autovacuum (e.g. {scale_factor: 0.005, archive: false}). Defaults to false (no change).

Returns:

  • (Boolean)

    true if queue was created, false if it already existed

Raises:



26
27
28
29
30
31
32
33
34
35
# File 'lib/pgmq/client/queue_management.rb', line 26

def create(queue_name, tune_autovacuum: false)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    existed = queue_exists?(conn, queue_name)
    conn.exec_params("SELECT pgmq.create($1::text)", [queue_name])
    apply_tune_autovacuum_option(conn, queue_name, tune_autovacuum)
    !existed
  end
end

#create_fifo_index(queue_name) ⇒ void

This method returns an undefined value.

Creates the FIFO index on a queue’s table required for grouped reads

Grouped read operations (‘read_grouped`, `read_grouped_rr`, `read_grouped_head`) rely on this index for correct ordering and acceptable query performance. Without it, grouped reads will work but may be slow or return incorrect ordering at scale. The operation is idempotent - calling it on a queue that already has the index is safe.

Examples:

client.create("tasks")
client.create_fifo_index("tasks")

Parameters:

  • queue_name (String)

    name of the queue

Raises:



128
129
130
131
132
133
134
135
136
# File 'lib/pgmq/client/queue_management.rb', line 128

def create_fifo_index(queue_name)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    conn.exec_params("SELECT pgmq.create_fifo_index($1::text)", [queue_name])
  end

  nil
end

#create_fifo_indexes_allvoid

This method returns an undefined value.

Creates FIFO indexes on all existing queues

Convenience wrapper that calls ‘create_fifo_index` for every queue registered in `pgmq.meta`. Useful for one-time migrations when adding grouped reads to an existing deployment. The operation is idempotent.

Examples:

Migrate an existing deployment to use grouped reads

client.create_fifo_indexes_all

Raises:



148
149
150
151
152
153
154
# File 'lib/pgmq/client/queue_management.rb', line 148

def create_fifo_indexes_all
  with_connection do |conn|
    conn.exec("SELECT pgmq.create_fifo_indexes_all()")
  end

  nil
end

#create_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", tune_autovacuum: false) ⇒ Boolean

Creates a partitioned queue

Requires pg_partman extension to be installed

Examples:

client.create_partitioned("big_queue",
  partition_interval: "daily",
  retention_interval: "7 days"
)  # => true

Parameters:

  • queue_name (String)

    name of the queue

  • partition_interval (String) (defaults to: "10000")

    partition interval (e.g., “daily”, “10000”)

  • retention_interval (String) (defaults to: "100000")

    retention interval (e.g., “7 days”, “100000”)

  • tune_autovacuum (Boolean, Hash) (defaults to: false)

    when truthy, tune autovacuum on the new queue’s tables after creation. Pass true for PGMQ-tuned defaults, or a Hash forwarded to Autovacuum#tune_autovacuum. Defaults to false. Note: storage parameters are set on the partitioned parent and do not cascade to partitions.

Returns:

  • (Boolean)

    true if queue was created, false if it already existed



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pgmq/client/queue_management.rb', line 54

def create_partitioned(
  queue_name,
  partition_interval: "10000",
  retention_interval: "100000",
  tune_autovacuum: false
)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    existed = queue_exists?(conn, queue_name)
    conn.exec_params(
      "SELECT pgmq.create_partitioned($1::text, $2::text, $3::text)",
      [queue_name, partition_interval, retention_interval]
    )
    apply_tune_autovacuum_option(conn, queue_name, tune_autovacuum)
    !existed
  end
end

#create_unlogged(queue_name, tune_autovacuum: false) ⇒ Boolean

Creates an unlogged queue for higher throughput (no crash recovery)

Examples:

client.create_unlogged("fast_queue")  # => true

Parameters:

  • queue_name (String)

    name of the queue

  • tune_autovacuum (Boolean, Hash) (defaults to: false)

    when truthy, tune autovacuum on the new queue’s tables after creation. Pass true for PGMQ-tuned defaults, or a Hash forwarded to Autovacuum#tune_autovacuum. Defaults to false.

Returns:

  • (Boolean)

    true if queue was created, false if it already existed



83
84
85
86
87
88
89
90
91
92
# File 'lib/pgmq/client/queue_management.rb', line 83

def create_unlogged(queue_name, tune_autovacuum: false)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    existed = queue_exists?(conn, queue_name)
    conn.exec_params("SELECT pgmq.create_unlogged($1::text)", [queue_name])
    apply_tune_autovacuum_option(conn, queue_name, tune_autovacuum)
    !existed
  end
end

#drop_queue(queue_name) ⇒ Boolean

Drops a queue and its archive table

Examples:

client.drop_queue("old_queue")

Parameters:

  • queue_name (String)

    name of the queue to drop

Returns:

  • (Boolean)

    true if queue was dropped



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/pgmq/client/queue_management.rb', line 101

def drop_queue(queue_name)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params("SELECT pgmq.drop_queue($1::text)", [queue_name])
  end

  return false if result.ntuples.zero?

  result[0]["drop_queue"] == "t"
end

#list_queuesArray<PGMQ::QueueMetadata>

Lists all queues

Examples:

queues = client.list_queues
queues.each { |q| puts q.queue_name }

Returns:



163
164
165
166
167
168
169
# File 'lib/pgmq/client/queue_management.rb', line 163

def list_queues
  result = with_connection do |conn|
    conn.exec("SELECT * FROM pgmq.list_queues()")
  end

  result.map { |row| QueueMetadata.new(row) }
end