Module: PGMQ::Client::QueueManagement

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/queue_management.rb

Overview

Queue management operations (create, drop, list)

This module handles all queue lifecycle operations including creating queues (standard, partitioned, unlogged), dropping queues, and listing existing queues.

Instance Method Summary collapse

Instance Method Details

#create(queue_name) ⇒ Boolean

Creates a new queue

Examples:

client.create("orders")  # => true (created)
client.create("orders")  # => false (already exists)

Parameters:

  • queue_name (String)

    name of the queue to create

Returns:

  • (Boolean)

    true if queue was created, false if it already existed

Raises:



20
21
22
23
24
25
26
27
28
# File 'lib/pgmq/client/queue_management.rb', line 20

def create(queue_name)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    existed = queue_exists?(conn, queue_name)
    conn.exec_params("SELECT pgmq.create($1::text)", [queue_name])
    !existed
  end
end

#create_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000") ⇒ Boolean

Creates a partitioned queue

Requires pg_partman extension to be installed

Examples:

client.create_partitioned("big_queue",
  partition_interval: "daily",
  retention_interval: "7 days"
)  # => true

Parameters:

  • queue_name (String)

    name of the queue

  • partition_interval (String) (defaults to: "10000")

    partition interval (e.g., “daily”, “10000”)

  • retention_interval (String) (defaults to: "100000")

    retention interval (e.g., “7 days”, “100000”)

Returns:

  • (Boolean)

    true if queue was created, false if it already existed



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/pgmq/client/queue_management.rb', line 44

def create_partitioned(
  queue_name,
  partition_interval: "10000",
  retention_interval: "100000"
)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    existed = queue_exists?(conn, queue_name)
    conn.exec_params(
      "SELECT pgmq.create_partitioned($1::text, $2::text, $3::text)",
      [queue_name, partition_interval, retention_interval]
    )
    !existed
  end
end

#create_unlogged(queue_name) ⇒ Boolean

Creates an unlogged queue for higher throughput (no crash recovery)

Examples:

client.create_unlogged("fast_queue")  # => true

Parameters:

  • queue_name (String)

    name of the queue

Returns:

  • (Boolean)

    true if queue was created, false if it already existed



68
69
70
71
72
73
74
75
76
# File 'lib/pgmq/client/queue_management.rb', line 68

def create_unlogged(queue_name)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    existed = queue_exists?(conn, queue_name)
    conn.exec_params("SELECT pgmq.create_unlogged($1::text)", [queue_name])
    !existed
  end
end

#drop_queue(queue_name) ⇒ Boolean

Drops a queue and its archive table

Examples:

client.drop_queue("old_queue")

Parameters:

  • queue_name (String)

    name of the queue to drop

Returns:

  • (Boolean)

    true if queue was dropped



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/pgmq/client/queue_management.rb', line 85

def drop_queue(queue_name)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params("SELECT pgmq.drop_queue($1::text)", [queue_name])
  end

  return false if result.ntuples.zero?

  result[0]["drop_queue"] == "t"
end

#list_queuesArray<PGMQ::QueueMetadata>

Lists all queues

Examples:

queues = client.list_queues
queues.each { |q| puts q.queue_name }

Returns:



104
105
106
107
108
109
110
# File 'lib/pgmq/client/queue_management.rb', line 104

def list_queues
  result = with_connection do |conn|
    conn.exec("SELECT * FROM pgmq.list_queues()")
  end

  result.map { |row| QueueMetadata.new(row) }
end