Module: PGMQ::Client::Maintenance

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/maintenance.rb

Overview

Queue maintenance operations

This module handles queue maintenance tasks such as purging messages and detaching archive tables.

Instance Method Summary collapse

Instance Method Details

#convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10) ⇒ void

Note:

Requires the ‘pg_partman` PostgreSQL extension. If pg_partman is not installed and the archive table exists, the call raises `PGMQ::Errors::ConnectionError`. If the archive table does not exist the call succeeds (returns nil) without touching pg_partman, so no extension is needed in that case.

This method returns an undefined value.

Converts a standard queue’s archive table to a pg_partman-managed partitioned table

Provides a migration path for queues originally created with ‘create` or `create_unlogged` whose archive tables have grown large enough to benefit from partitioning. The queue message table is not affected - only the archive table (`pgmq.a_<queue_name>`) is converted.

The operation renames the existing archive table to ‘pgmq.a_<queue_name>_old`, creates a new partitioned table with the same schema, and hands it over to pg_partman for lifecycle management. **Existing archived rows are left in the `_old` table and must be migrated manually** if visibility in the new partitioned archive is needed. If the archive table is already partitioned the function returns without error (idempotent). If the archive table does not exist it also returns without error.

Examples:

Convert with default partitioning (row-count based)

client.convert_archive_partitioned("orders")

Convert with time-based daily partitioning

client.convert_archive_partitioned("orders",
  partition_interval: "daily",
  retention_interval: "30 days"
)

Parameters:

  • queue_name (String)

    name of the queue whose archive table to convert

  • partition_interval (String) (defaults to: "10000")

    partition interval passed to pg_partman (default: “10000” rows or a time expression such as “daily” / “1 month”)

  • retention_interval (String) (defaults to: "100000")

    retention interval passed to pg_partman (default: “100000”)

  • leading_partition (Integer) (defaults to: 10)

    number of leading partitions pg_partman should pre-create (default: 10)

Raises:



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/pgmq/client/maintenance.rb', line 90

def convert_archive_partitioned(
  queue_name,
  partition_interval: "10000",
  retention_interval: "100000",
  leading_partition: 10
)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.convert_archive_partitioned($1::text, $2::text, $3::text, $4::integer)",
      [queue_name, partition_interval, retention_interval, leading_partition]
    )
  end

  nil
end

#disable_notify_insert(queue_name) ⇒ void

This method returns an undefined value.

Disables PostgreSQL NOTIFY for a queue

Examples:

client.disable_notify_insert("orders")

Parameters:

  • queue_name (String)

    name of the queue



163
164
165
166
167
168
169
170
171
# File 'lib/pgmq/client/maintenance.rb', line 163

def disable_notify_insert(queue_name)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    conn.exec_params("SELECT pgmq.disable_notify_insert($1::text)", [queue_name])
  end

  nil
end

#enable_notify_insert(queue_name, throttle_interval_ms: 250) ⇒ void

This method returns an undefined value.

Enables PostgreSQL NOTIFY when messages are inserted into a queue

When enabled, PostgreSQL will send a NOTIFY event on message insert, allowing clients to use LISTEN instead of polling. The throttle interval prevents notification storms during high-volume inserts.

Examples:

Enable with default throttle (250ms)

client.enable_notify_insert("orders")

Enable with custom throttle (1 second)

client.enable_notify_insert("orders", throttle_interval_ms: 1000)

Disable throttling (notify on every insert)

client.enable_notify_insert("orders", throttle_interval_ms: 0)

Parameters:

  • queue_name (String)

    name of the queue

  • throttle_interval_ms (Integer) (defaults to: 250)

    minimum ms between notifications (default: 250)



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/pgmq/client/maintenance.rb', line 44

def enable_notify_insert(queue_name, throttle_interval_ms: 250)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.enable_notify_insert($1::text, $2::integer)",
      [queue_name, throttle_interval_ms]
    )
  end

  nil
end

#list_notify_insert_throttlesArray<PGMQ::NotifyThrottle>

Lists all queues that have a NOTIFY trigger enabled, with their throttle configuration

Returns one NotifyThrottle per queue that has had #enable_notify_insert called on it. Useful for auditing notification configuration across all queues at once.

Examples:

throttles = client.list_notify_insert_throttles
throttles.each do |t|
  puts "#{t.queue_name}: #{t.throttle_interval_ms}ms"
end

Returns:



120
121
122
123
124
125
126
# File 'lib/pgmq/client/maintenance.rb', line 120

def list_notify_insert_throttles
  result = with_connection do |conn|
    conn.exec("SELECT * FROM pgmq.list_notify_insert_throttles()")
  end

  result.map { |row| PGMQ::NotifyThrottle.new(row) }
end

#purge_queue(queue_name) ⇒ Integer

Purges all messages from a queue

Examples:

count = client.purge_queue("old_queue")
puts "Purged #{count} messages"

Parameters:

  • queue_name (String)

    name of the queue

Returns:

  • (Integer)

    number of messages purged



17
18
19
20
21
22
23
24
25
# File 'lib/pgmq/client/maintenance.rb', line 17

def purge_queue(queue_name)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params("SELECT pgmq.purge_queue($1::text)", [queue_name])
  end

  result[0]["purge_queue"]
end

#update_notify_insert(queue_name, throttle_interval_ms:) ⇒ void

This method returns an undefined value.

Updates the throttle interval for an already-enabled NOTIFY trigger

Changes how frequently PostgreSQL is allowed to fire a NOTIFY event on message insert, without having to disable and re-enable the trigger. The queue must already have notifications enabled via #enable_notify_insert.

Examples:

Tighten throttle to 100ms during high-throughput ingestion

client.update_notify_insert("orders", throttle_interval_ms: 100)

Remove throttling entirely

client.update_notify_insert("orders", throttle_interval_ms: 0)

Parameters:

  • queue_name (String)

    name of the queue

  • throttle_interval_ms (Integer)

    new minimum ms between notifications



143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/pgmq/client/maintenance.rb', line 143

def update_notify_insert(queue_name, throttle_interval_ms:)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.update_notify_insert($1::text, $2::integer)",
      [queue_name, throttle_interval_ms]
    )
  end

  nil
end

#wait_for_notify(queue_name, timeout: nil) ⇒ String?

Note:

Orchestration (retry loop, reconnect-on-drop, graceful shutdown) is the caller’s responsibility. This method is a thin primitive — it listens once, waits, and returns.

Blocks until a PostgreSQL NOTIFY arrives on the queue’s channel or the timeout expires.

PGMQ sends notifications on the channel ‘pgmq.q_<queue_name>.INSERT` whenever a message is inserted (requires #enable_notify_insert to be called first). This method issues `LISTEN`, waits for a notification via the `pg` gem’s ‘wait_for_notify`, then issues `UNLISTEN` before returning the connection to the pool.

Compared with Consumer#read_with_poll, which holds the connection inside a PL/pgSQL loop for the full poll window, ‘wait_for_notify` releases the connection the moment a notification arrives (or the timeout expires). This makes it more efficient under low message rates where the poll window would otherwise burn idle time.

The optional block receives ‘channel`, `backend_pid`, and `payload` when a notification arrives. The return value mirrors `PG::Connection#wait_for_notify`: the channel name string on success, or `nil` on timeout.

Examples:

Basic usage (wake up when a message arrives, then read it)

client.enable_notify_insert("orders")

loop do
  next unless client.wait_for_notify("orders", timeout: 5)
  msg = client.read("orders", vt: 30)
  process(msg) if msg
end

Block form (inspect notification metadata)

client.wait_for_notify("orders", timeout: 5) do |channel, pid, payload|
  puts "Notified on #{channel} by backend #{pid}"
end

Parameters:

  • queue_name (String)

    name of the queue (must have notifications enabled via #enable_notify_insert)

  • timeout (Numeric, nil) (defaults to: nil)

    seconds to wait; ‘nil` blocks indefinitely

Returns:

  • (String, nil)

    notification channel name, or ‘nil` if the timeout expired



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/pgmq/client/maintenance.rb', line 208

def wait_for_notify(queue_name, timeout: nil)
  validate_queue_name!(queue_name)
  # PGMQ trigger fires pg_notify('pgmq.q_<queue>.INSERT', NULL)
  channel = "pgmq.q_#{queue_name}.INSERT"

  with_connection do |conn|
    conn.exec("LISTEN \"#{channel}\"")
    begin
      conn.wait_for_notify(timeout) do |ch, pid, payload|
        yield ch, pid, payload if block_given?
      end
    ensure
      conn.exec("UNLISTEN \"#{channel}\"")
    end
  end
end