Class: Pgbus::Client

Inherits:
Object
  • Object
show all
Includes:
EnsureStreamQueue, NotifyStream, ReadAfter
Defined in:
lib/pgbus/client.rb,
lib/pgbus/client/read_after.rb,
lib/pgbus/client/notify_stream.rb,
lib/pgbus/client/ensure_stream_queue.rb

Defined Under Namespace

Modules: EnsureStreamQueue, NotifyStream, ReadAfter

Constant Summary collapse

NOTIFY_THROTTLE_MS =

Throttle window for PGMQ’s enable_notify_insert trigger. Postgres NOTIFYs are coalesced into one wake-up per window, so a value of 250ms means: at most 4 broadcasts/sec per queue, regardless of insert rate. The trigger is a Postgres-level concern; exposing it as a setting never came up in practice and changing it on the fly would require re-running the trigger DDL on every queue.

250

Constants included from ReadAfter

ReadAfter::DEFAULT_LIMIT

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from NotifyStream

#notify_stream

Methods included from EnsureStreamQueue

#ensure_stream_queue

Methods included from ReadAfter

#read_after, #stream_current_msg_id, #stream_oldest_msg_id

Constructor Details

#initialize(config = Pgbus.configuration) ⇒ Client

Returns a new instance of Client.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/pgbus/client.rb', line 28

def initialize(config = Pgbus.configuration)
  # Define the PGMQ module before requiring the gem so that Zeitwerk's
  # eager_load (called inside pgmq.rb) can resolve the constant.
  # Without this, Ruby 4.0 + Zeitwerk 2.7.5 raises NameError because
  # eager_load runs const_get(:Client) on PGMQ before the module is defined.
  PGMQ_REQUIRE_MUTEX.synchronize do
    Object.const_set(:PGMQ, Module.new) unless defined?(::PGMQ)
    require "pgmq"
  end
  @config = config
  conn_opts = config.connection_options
  @shared_connection = conn_opts.is_a?(Proc)

  if @shared_connection
    # When using the Rails lambda path (-> { AR::Base.connection.raw_connection }),
    # the Proc returns the same underlying PG::Connection that ActiveRecord uses.
    # PG::Connection (libpq) is not thread-safe — concurrent access causes
    # segfaults and result corruption. Force pool_size=1 and serialize all
    # operations through a mutex.
    @pgmq = PGMQ::Client.new(conn_opts, pool_size: 1, pool_timeout: config.pool_timeout)
    @pgmq_mutex = Mutex.new
  else
    # With a String URL or Hash params, pgmq-ruby creates its own dedicated
    # PG::Connection per pool slot — no shared state with ActiveRecord.
    # Use the resolved pool size (auto-tuned from worker thread counts
    # unless explicitly set) and let pgmq-ruby's connection_pool handle
    # concurrency internally (no mutex needed).
    @pgmq = PGMQ::Client.new(conn_opts, pool_size: config.resolved_pool_size, pool_timeout: config.pool_timeout)
    @pgmq_mutex = nil
  end

  @queues_created = Concurrent::Map.new
  @stream_indexes_created = Concurrent::Map.new
  @queue_strategy = QueueFactory.for(config)
  @schema_ensured = false
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



15
16
17
# File 'lib/pgbus/client.rb', line 15

def config
  @config
end

#pgmqObject (readonly)

Returns the value of attribute pgmq.



15
16
17
# File 'lib/pgbus/client.rb', line 15

def pgmq
  @pgmq
end

Instance Method Details

#archive_batch(queue_name, msg_ids, prefixed: true) ⇒ Object

Batch archive — moves multiple messages to the archive table in one call.



211
212
213
214
215
216
# File 'lib/pgbus/client.rb', line 211

def archive_batch(queue_name, msg_ids, prefixed: true)
  name = prefixed ? config.queue_name(queue_name) : queue_name
  with_stale_connection_retry do
    synchronized { @pgmq.archive_batch(name, msg_ids) }
  end
end

#archive_message(queue_name, msg_id, prefixed: true) ⇒ Object

Archive a message. Pass prefixed: false when queue_name is already the full PGMQ queue name.



203
204
205
206
207
208
# File 'lib/pgbus/client.rb', line 203

def archive_message(queue_name, msg_id, prefixed: true)
  name = prefixed ? config.queue_name(queue_name) : queue_name
  with_stale_connection_retry do
    synchronized { @pgmq.archive(name, msg_id) }
  end
end

#bind_topic(pattern, queue_name) ⇒ Object

Topic routing



439
440
441
442
443
444
445
# File 'lib/pgbus/client.rb', line 439

def bind_topic(pattern, queue_name)
  full_name = config.queue_name(queue_name)
  with_stale_connection_retry do
    ensure_queue(queue_name)
    synchronized { @pgmq.bind_topic(pattern, full_name) }
  end
end

#closeObject



460
461
462
# File 'lib/pgbus/client.rb', line 460

def close
  synchronized { @pgmq.close }
end

#convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10) ⇒ Object

— Archive partitioning (requires pg_partman extension) —



423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/pgbus/client.rb', line 423

def convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000",
                                leading_partition: 10)
  full_name = config.queue_name(queue_name)
  with_stale_connection_retry do
    synchronized do
      @pgmq.convert_archive_partitioned(
        full_name,
        partition_interval: partition_interval,
        retention_interval: retention_interval,
        leading_partition: leading_partition
      )
    end
  end
end

#create_fifo_index(queue_name) ⇒ Object

— FIFO index management (PGMQ v1.11.0+) —



386
387
388
389
390
391
# File 'lib/pgbus/client.rb', line 386

def create_fifo_index(queue_name)
  full_name = config.queue_name(queue_name)
  with_stale_connection_retry do
    synchronized { @pgmq.create_fifo_index(full_name) }
  end
end

#create_fifo_indexes_allObject



393
394
395
396
397
# File 'lib/pgbus/client.rb', line 393

def create_fifo_indexes_all
  with_stale_connection_retry do
    synchronized { @pgmq.create_fifo_indexes_all }
  end
end

#delete_batch(queue_name, msg_ids, prefixed: true) ⇒ Object

Batch delete — permanently removes multiple messages in one call.



219
220
221
222
223
224
# File 'lib/pgbus/client.rb', line 219

def delete_batch(queue_name, msg_ids, prefixed: true)
  name = prefixed ? config.queue_name(queue_name) : queue_name
  with_stale_connection_retry do
    synchronized { @pgmq.delete_batch(name, msg_ids) }
  end
end

#delete_message(queue_name, msg_id, prefixed: true) ⇒ Object

Delete a message. Pass prefixed: false when queue_name is already the full PGMQ queue name (e.g. from priority sub-queues or dashboard).



194
195
196
197
198
199
# File 'lib/pgbus/client.rb', line 194

def delete_message(queue_name, msg_id, prefixed: true)
  name = prefixed ? config.queue_name(queue_name) : queue_name
  with_stale_connection_retry do
    synchronized { @pgmq.delete(name, msg_id) }
  end
end

#drop_queue(queue_name, prefixed: true) ⇒ Object



284
285
286
287
288
289
290
291
# File 'lib/pgbus/client.rb', line 284

def drop_queue(queue_name, prefixed: true)
  name = prefixed ? config.queue_name(queue_name) : queue_name
  result = with_stale_connection_retry do
    synchronized { @pgmq.drop_queue(name) }
  end
  @queues_created.delete(name)
  result
end

#ensure_all_queuesObject



70
71
72
73
74
# File 'lib/pgbus/client.rb', line 70

def ensure_all_queues
  queue_names = collect_configured_queues
  Pgbus.logger.info { "[Pgbus] Bootstrapping #{queue_names.size} queue(s): #{queue_names.join(", ")}" }
  queue_names.each { |name| ensure_queue(name) }
end

#ensure_dead_letter_queue(name) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pgbus/client.rb', line 76

def ensure_dead_letter_queue(name)
  dlq_name = config.dead_letter_queue_name(name)
  return if @queues_created[dlq_name]

  @queues_created.compute_if_absent(dlq_name) do
    synchronized do
      @pgmq.create(dlq_name)
      tune_autovacuum(dlq_name)
    end
    true
  end
end

#ensure_queue(name) ⇒ Object



65
66
67
68
# File 'lib/pgbus/client.rb', line 65

def ensure_queue(name)
  ensure_pgmq_schema
  @queue_strategy.physical_queue_names(name).each { |pq| ensure_single_queue(pq) }
end

#list_notify_insert_throttlesObject



415
416
417
418
419
# File 'lib/pgbus/client.rb', line 415

def list_notify_insert_throttles
  with_stale_connection_retry do
    synchronized { @pgmq.list_notify_insert_throttles }
  end
end

#list_queuesObject



271
272
273
274
275
# File 'lib/pgbus/client.rb', line 271

def list_queues
  with_stale_connection_retry do
    synchronized { @pgmq.list_queues }
  end
end

#message_exists?(queue_name, msg_id: nil, uniqueness_key: nil) ⇒ Boolean

Check whether a message exists in the given queue.

Pass either msg_id for a fast primary-key lookup, or uniqueness_key to scan the queue for any message whose payload carries that key in the pgbus_uniqueness_key JSONB field. The latter is used by the dispatcher reaper to determine if a uniqueness lock with msg_id=0 (placeholder) still has a corresponding queue message.

queue_name may be either a logical name (e.g. “default”) or an already prefixed physical name (e.g. “pgbus_default”). The client normalizes both.

Returns:

true  — the message definitely exists in the queue
false — the message definitely does not exist
nil   — could not determine (e.g. queue table missing or unknown error).
        Callers MUST treat nil as "exists" for safety.

Returns:

  • (Boolean)


309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/pgbus/client.rb', line 309

def message_exists?(queue_name, msg_id: nil, uniqueness_key: nil)
  has_msg_id = !msg_id.nil?
  has_uniqueness_key = !uniqueness_key.nil?
  raise ArgumentError, "pass exactly one of msg_id or uniqueness_key" unless has_msg_id ^ has_uniqueness_key

  full_name = resolve_full_queue_name(queue_name)
  sanitized = QueueNameValidator.sanitize!(full_name)

  synchronized do
    with_raw_connection do |conn|
      if has_msg_id
        msg_id_present?(conn, sanitized, msg_id.to_i)
      else
        uniqueness_key_present?(conn, sanitized, uniqueness_key)
      end
    end
  end
rescue ActiveRecord::StatementInvalid => e
  raise unless undefined_table_error?(e)

  nil
rescue StandardError => e
  raise unless defined?(PG::UndefinedTable) && e.is_a?(PG::UndefinedTable)

  nil
end

#metrics(queue_name = nil) ⇒ Object



259
260
261
262
263
264
265
266
267
268
269
# File 'lib/pgbus/client.rb', line 259

def metrics(queue_name = nil)
  with_stale_connection_retry do
    synchronized do
      if queue_name
        @pgmq.metrics(config.queue_name(queue_name))
      else
        @pgmq.metrics_all
      end
    end
  end
end

#move_to_dead_letter(queue_name, message) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/pgbus/client.rb', line 244

def move_to_dead_letter(queue_name, message)
  dlq_name = config.dead_letter_queue_name(queue_name)
  full_queue = config.queue_name(queue_name)

  with_stale_connection_retry do
    ensure_dead_letter_queue(queue_name)
    synchronized do
      @pgmq.transaction do |txn|
        txn.produce(dlq_name, message.message, headers: message.headers)
        txn.delete(full_queue, message.msg_id.to_i)
      end
    end
  end
end

#publish_to_topic(routing_key, payload, headers: nil, delay: 0) ⇒ Object



447
448
449
450
451
452
453
454
455
456
457
458
# File 'lib/pgbus/client.rb', line 447

def publish_to_topic(routing_key, payload, headers: nil, delay: 0)
  with_stale_connection_retry do
    synchronized do
      @pgmq.produce_topic(
        routing_key,
        serialize(payload),
        headers: headers && serialize(headers),
        delay: delay
      )
    end
  end
end

#purge_archive(queue_name, older_than:, batch_size: 1000) ⇒ Object



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/pgbus/client.rb', line 336

def purge_archive(queue_name, older_than:, batch_size: 1000)
  full_name = config.queue_name(queue_name)
  sanitized = QueueNameValidator.sanitize!(full_name)
  total = 0

  sql = "DELETE FROM pgmq.a_#{sanitized} " \
        "WHERE ctid = ANY(ARRAY(SELECT ctid FROM pgmq.a_#{sanitized} WHERE enqueued_at < $1 LIMIT $2))"

  loop do
    deleted = synchronized do
      with_raw_connection do |conn|
        conn.exec_params(sql, [older_than, batch_size]).cmd_tuples
      end
    end
    total += deleted
    break if deleted < batch_size
  end

  total
end

#purge_queue(queue_name, prefixed: true) ⇒ Object



277
278
279
280
281
282
# File 'lib/pgbus/client.rb', line 277

def purge_queue(queue_name, prefixed: true)
  name = prefixed ? config.queue_name(queue_name) : queue_name
  with_stale_connection_retry do
    synchronized { @pgmq.purge_queue(name) }
  end
end

#read_batch(queue_name, qty:, vt: nil) ⇒ Object



119
120
121
122
123
124
125
126
# File 'lib/pgbus/client.rb', line 119

def read_batch(queue_name, qty:, vt: nil)
  full_name = config.queue_name(queue_name)
  Instrumentation.instrument("pgbus.client.read_batch", queue: full_name, qty: qty) do
    with_stale_connection_retry do
      synchronized { with_read_timeout { @pgmq.read_batch(full_name, vt: vt || config.visibility_timeout, qty: qty) } }
    end
  end
end

#read_batch_prioritized(queue_name, qty:, vt: nil) ⇒ Object

Read from priority sub-queues, highest priority (p0) first. Returns [priority_queue_name, messages] pairs.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/pgbus/client.rb', line 130

def read_batch_prioritized(queue_name, qty:, vt: nil)
  unless @queue_strategy.priority?
    return (read_batch(queue_name, qty: qty, vt: vt) || []).map do |m|
      [config.queue_name(queue_name), m]
    end
  end

  remaining = qty
  results = []

  config.priority_queue_names(queue_name).each do |pq_name|
    break if remaining <= 0

    msgs = Instrumentation.instrument("pgbus.client.read_batch", queue: pq_name, qty: remaining) do
      with_stale_connection_retry do
        synchronized { with_read_timeout { @pgmq.read_batch(pq_name, vt: vt || config.visibility_timeout, qty: remaining) } }
      end
    end || []

    msgs.each { |m| results << [pq_name, m] }
    remaining -= msgs.size
  end

  results
end

#read_grouped(queue_name, qty:, vt: nil) ⇒ Object

— Grouped reads (PGMQ v1.11.0+) —



359
360
361
362
363
364
365
366
# File 'lib/pgbus/client.rb', line 359

def read_grouped(queue_name, qty:, vt: nil)
  full_name = config.queue_name(queue_name)
  Instrumentation.instrument("pgbus.client.read_grouped", queue: full_name, qty: qty) do
    with_stale_connection_retry do
      synchronized { with_read_timeout { @pgmq.read_grouped(full_name, vt: vt || config.visibility_timeout, qty: qty) } }
    end
  end
end

#read_grouped_head(queue_name, qty:, vt: nil) ⇒ Object



377
378
379
380
381
382
# File 'lib/pgbus/client.rb', line 377

def read_grouped_head(queue_name, qty:, vt: nil)
  full_name = config.queue_name(queue_name)
  with_stale_connection_retry do
    synchronized { with_read_timeout { @pgmq.read_grouped_head(full_name, vt: vt || config.visibility_timeout, qty: qty) } }
  end
end

#read_grouped_rr(queue_name, qty:, vt: nil) ⇒ Object



368
369
370
371
372
373
374
375
# File 'lib/pgbus/client.rb', line 368

def read_grouped_rr(queue_name, qty:, vt: nil)
  full_name = config.queue_name(queue_name)
  Instrumentation.instrument("pgbus.client.read_grouped_rr", queue: full_name, qty: qty) do
    with_stale_connection_retry do
      synchronized { with_read_timeout { @pgmq.read_grouped_rr(full_name, vt: vt || config.visibility_timeout, qty: qty) } }
    end
  end
end

#read_message(queue_name, vt: nil) ⇒ Object



110
111
112
113
114
115
116
117
# File 'lib/pgbus/client.rb', line 110

def read_message(queue_name, vt: nil)
  full_name = config.queue_name(queue_name)
  Instrumentation.instrument("pgbus.client.read_message", queue: full_name) do
    with_stale_connection_retry do
      synchronized { with_read_timeout { @pgmq.read(full_name, vt: vt || config.visibility_timeout) } }
    end
  end
end

#read_multi(queue_names, qty:, vt: nil, limit: nil) ⇒ Object

Read from multiple queues in a single SQL query (UNION ALL). Each returned message includes a queue_name field identifying its source. queue_names should be logical names (prefix is added automatically).

‘qty` is the per-queue cap (pgmq-ruby semantics), so without `limit:` the caller receives up to `queue_count * qty` messages. Pass `limit:` to cap the total across all queues — required when feeding a fixed-size pool, otherwise the pool can overflow on multi-queue reads (issue #123).



179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/pgbus/client.rb', line 179

def read_multi(queue_names, qty:, vt: nil, limit: nil)
  full_names = queue_names.map { |q| config.queue_name(q) }
  Instrumentation.instrument("pgbus.client.read_multi", queues: full_names, qty: qty, limit: limit) do
    with_stale_connection_retry do
      synchronized do
        with_read_timeout do
          @pgmq.read_multi(full_names, vt: vt || config.visibility_timeout, qty: qty, limit: limit)
        end
      end
    end
  end
end

#read_with_poll(queue_name, qty:, vt: nil, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/pgbus/client.rb', line 156

def read_with_poll(queue_name, qty:, vt: nil, max_poll_seconds: 5, poll_interval_ms: 100)
  full_name = config.queue_name(queue_name)
  with_stale_connection_retry do
    synchronized do
      @pgmq.read_with_poll(
        full_name,
        vt: vt || config.visibility_timeout,
        qty: qty,
        max_poll_seconds: max_poll_seconds,
        poll_interval_ms: poll_interval_ms
      )
    end
  end
end

#send_batch(queue_name, payloads, headers: nil, delay: 0) ⇒ Object



99
100
101
102
103
104
105
106
107
108
# File 'lib/pgbus/client.rb', line 99

def send_batch(queue_name, payloads, headers: nil, delay: 0)
  full_name = config.queue_name(queue_name)
  serialized, serialized_headers = serialize_batch(payloads, headers)
  Instrumentation.instrument("pgbus.client.send_batch", queue: full_name, size: payloads.size) do
    with_stale_connection_retry do
      ensure_queue(queue_name)
      synchronized { @pgmq.produce_batch(full_name, serialized, headers: serialized_headers, delay: delay) }
    end
  end
end

#send_message(queue_name, payload, headers: nil, delay: 0, priority: nil) ⇒ Object



89
90
91
92
93
94
95
96
97
# File 'lib/pgbus/client.rb', line 89

def send_message(queue_name, payload, headers: nil, delay: 0, priority: nil)
  target = @queue_strategy.target_queue(queue_name, priority)
  Instrumentation.instrument("pgbus.client.send_message", queue: target) do
    with_stale_connection_retry do
      ensure_queue(queue_name)
      synchronized { @pgmq.produce(target, serialize(payload), headers: headers && serialize(headers), delay: delay) }
    end
  end
end

#set_visibility_timeout(queue_name, msg_id, vt:, prefixed: true) ⇒ Object

Set visibility timeout. Pass prefixed: false when queue_name is already the full PGMQ queue name.



228
229
230
231
232
233
# File 'lib/pgbus/client.rb', line 228

def set_visibility_timeout(queue_name, msg_id, vt:, prefixed: true)
  name = prefixed ? config.queue_name(queue_name) : queue_name
  with_stale_connection_retry do
    synchronized { @pgmq.set_vt(name, msg_id, vt: vt) }
  end
end

#transaction(&block) ⇒ Object

Open a PGMQ transaction. The caller block may run twice if the first attempt hits a pre-flight stale-connection error — safe because no SQL was sent on the first attempt (the connection was dead before the BEGIN).



238
239
240
241
242
# File 'lib/pgbus/client.rb', line 238

def transaction(&block)
  with_stale_connection_retry do
    synchronized { @pgmq.transaction(&block) }
  end
end

#update_notify_insert(queue_name, throttle_interval_ms:) ⇒ Object



408
409
410
411
412
413
# File 'lib/pgbus/client.rb', line 408

def update_notify_insert(queue_name, throttle_interval_ms:)
  full_name = config.queue_name(queue_name)
  with_stale_connection_retry do
    synchronized { @pgmq.update_notify_insert(full_name, throttle_interval_ms: throttle_interval_ms) }
  end
end

#wait_for_notify(queue_name, timeout: nil, &block) ⇒ Object

— LISTEN/NOTIFY management (PGMQ v1.11.0+) —



401
402
403
404
405
406
# File 'lib/pgbus/client.rb', line 401

def wait_for_notify(queue_name, timeout: nil, &block)
  full_name = config.queue_name(queue_name)
  with_stale_connection_retry do
    synchronized { @pgmq.wait_for_notify(full_name, timeout: timeout, &block) }
  end
end