Class: Pgbus::Client
- Inherits:
-
Object
- Object
- Pgbus::Client
- Includes:
- EnsureStreamQueue, NotifyStream, ReadAfter
- Defined in:
- lib/pgbus/client.rb,
lib/pgbus/client/read_after.rb,
lib/pgbus/client/notify_stream.rb,
lib/pgbus/client/ensure_stream_queue.rb
Defined Under Namespace
Modules: EnsureStreamQueue, NotifyStream, ReadAfter
Constant Summary collapse
- NOTIFY_THROTTLE_MS =
Throttle window for PGMQ’s enable_notify_insert trigger. Postgres NOTIFYs are coalesced into one wake-up per window, so a value of 250ms means: at most 4 broadcasts/sec per queue, regardless of insert rate. The trigger is a Postgres-level concern; exposing it as a setting never came up in practice and changing it on the fly would require re-running the trigger DDL on every queue.
250
Constants included from ReadAfter
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#pgmq ⇒ Object
readonly
Returns the value of attribute pgmq.
Instance Method Summary collapse
-
#archive_batch(queue_name, msg_ids, prefixed: true) ⇒ Object
Batch archive — moves multiple messages to the archive table in one call.
-
#archive_message(queue_name, msg_id, prefixed: true) ⇒ Object
Archive a message.
-
#bind_topic(pattern, queue_name) ⇒ Object
Topic routing.
- #close ⇒ Object
-
#convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10) ⇒ Object
— Archive partitioning (requires pg_partman extension) —.
-
#create_fifo_index(queue_name) ⇒ Object
— FIFO index management (PGMQ v1.11.0+) —.
- #create_fifo_indexes_all ⇒ Object
-
#delete_batch(queue_name, msg_ids, prefixed: true) ⇒ Object
Batch delete — permanently removes multiple messages in one call.
-
#delete_message(queue_name, msg_id, prefixed: true) ⇒ Object
Delete a message.
- #drop_queue(queue_name, prefixed: true) ⇒ Object
- #ensure_all_queues ⇒ Object
- #ensure_dead_letter_queue(name) ⇒ Object
- #ensure_queue(name) ⇒ Object
-
#initialize(config = Pgbus.configuration) ⇒ Client
constructor
A new instance of Client.
- #list_notify_insert_throttles ⇒ Object
- #list_queues ⇒ Object
-
#message_exists?(queue_name, msg_id: nil, uniqueness_key: nil) ⇒ Boolean
Check whether a message exists in the given queue.
- #metrics(queue_name = nil) ⇒ Object
- #move_to_dead_letter(queue_name, message) ⇒ Object
- #publish_to_topic(routing_key, payload, headers: nil, delay: 0) ⇒ Object
- #purge_archive(queue_name, older_than:, batch_size: 1000) ⇒ Object
- #purge_queue(queue_name, prefixed: true) ⇒ Object
- #read_batch(queue_name, qty:, vt: nil) ⇒ Object
-
#read_batch_prioritized(queue_name, qty:, vt: nil) ⇒ Object
Read from priority sub-queues, highest priority (p0) first.
-
#read_grouped(queue_name, qty:, vt: nil) ⇒ Object
— Grouped reads (PGMQ v1.11.0+) —.
- #read_grouped_head(queue_name, qty:, vt: nil) ⇒ Object
- #read_grouped_rr(queue_name, qty:, vt: nil) ⇒ Object
- #read_message(queue_name, vt: nil) ⇒ Object
-
#read_multi(queue_names, qty:, vt: nil, limit: nil) ⇒ Object
Read from multiple queues in a single SQL query (UNION ALL).
- #read_with_poll(queue_name, qty:, vt: nil, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Object
- #send_batch(queue_name, payloads, headers: nil, delay: 0) ⇒ Object
- #send_message(queue_name, payload, headers: nil, delay: 0, priority: nil) ⇒ Object
-
#set_visibility_timeout(queue_name, msg_id, vt:, prefixed: true) ⇒ Object
Set visibility timeout.
-
#transaction(&block) ⇒ Object
Open a PGMQ transaction.
- #update_notify_insert(queue_name, throttle_interval_ms:) ⇒ Object
-
#wait_for_notify(queue_name, timeout: nil, &block) ⇒ Object
— LISTEN/NOTIFY management (PGMQ v1.11.0+) —.
Methods included from NotifyStream
Methods included from EnsureStreamQueue
Methods included from ReadAfter
#read_after, #stream_current_msg_id, #stream_oldest_msg_id
Constructor Details
#initialize(config = Pgbus.configuration) ⇒ Client
Returns a new instance of Client.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/pgbus/client.rb', line 28 def initialize(config = Pgbus.configuration) # Define the PGMQ module before requiring the gem so that Zeitwerk's # eager_load (called inside pgmq.rb) can resolve the constant. # Without this, Ruby 4.0 + Zeitwerk 2.7.5 raises NameError because # eager_load runs const_get(:Client) on PGMQ before the module is defined. PGMQ_REQUIRE_MUTEX.synchronize do Object.const_set(:PGMQ, Module.new) unless defined?(::PGMQ) require "pgmq" end @config = config conn_opts = config. @shared_connection = conn_opts.is_a?(Proc) if @shared_connection # When using the Rails lambda path (-> { AR::Base.connection.raw_connection }), # the Proc returns the same underlying PG::Connection that ActiveRecord uses. # PG::Connection (libpq) is not thread-safe — concurrent access causes # segfaults and result corruption. Force pool_size=1 and serialize all # operations through a mutex. @pgmq = PGMQ::Client.new(conn_opts, pool_size: 1, pool_timeout: config.pool_timeout) @pgmq_mutex = Mutex.new else # With a String URL or Hash params, pgmq-ruby creates its own dedicated # PG::Connection per pool slot — no shared state with ActiveRecord. # Use the resolved pool size (auto-tuned from worker thread counts # unless explicitly set) and let pgmq-ruby's connection_pool handle # concurrency internally (no mutex needed). @pgmq = PGMQ::Client.new(conn_opts, pool_size: config.resolved_pool_size, pool_timeout: config.pool_timeout) @pgmq_mutex = nil end @queues_created = Concurrent::Map.new @stream_indexes_created = Concurrent::Map.new @queue_strategy = QueueFactory.for(config) @schema_ensured = false end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
15 16 17 |
# File 'lib/pgbus/client.rb', line 15 def config @config end |
#pgmq ⇒ Object (readonly)
Returns the value of attribute pgmq.
15 16 17 |
# File 'lib/pgbus/client.rb', line 15 def pgmq @pgmq end |
Instance Method Details
#archive_batch(queue_name, msg_ids, prefixed: true) ⇒ Object
Batch archive — moves multiple messages to the archive table in one call.
211 212 213 214 215 216 |
# File 'lib/pgbus/client.rb', line 211 def archive_batch(queue_name, msg_ids, prefixed: true) name = prefixed ? config.queue_name(queue_name) : queue_name with_stale_connection_retry do synchronized { @pgmq.archive_batch(name, msg_ids) } end end |
#archive_message(queue_name, msg_id, prefixed: true) ⇒ Object
Archive a message. Pass prefixed: false when queue_name is already the full PGMQ queue name.
203 204 205 206 207 208 |
# File 'lib/pgbus/client.rb', line 203 def (queue_name, msg_id, prefixed: true) name = prefixed ? config.queue_name(queue_name) : queue_name with_stale_connection_retry do synchronized { @pgmq.archive(name, msg_id) } end end |
#bind_topic(pattern, queue_name) ⇒ Object
Topic routing
439 440 441 442 443 444 445 |
# File 'lib/pgbus/client.rb', line 439 def bind_topic(pattern, queue_name) full_name = config.queue_name(queue_name) with_stale_connection_retry do ensure_queue(queue_name) synchronized { @pgmq.bind_topic(pattern, full_name) } end end |
#close ⇒ Object
460 461 462 |
# File 'lib/pgbus/client.rb', line 460 def close synchronized { @pgmq.close } end |
#convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10) ⇒ Object
— Archive partitioning (requires pg_partman extension) —
423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/pgbus/client.rb', line 423 def convert_archive_partitioned(queue_name, partition_interval: "10000", retention_interval: "100000", leading_partition: 10) full_name = config.queue_name(queue_name) with_stale_connection_retry do synchronized do @pgmq.convert_archive_partitioned( full_name, partition_interval: partition_interval, retention_interval: retention_interval, leading_partition: leading_partition ) end end end |
#create_fifo_index(queue_name) ⇒ Object
— FIFO index management (PGMQ v1.11.0+) —
386 387 388 389 390 391 |
# File 'lib/pgbus/client.rb', line 386 def create_fifo_index(queue_name) full_name = config.queue_name(queue_name) with_stale_connection_retry do synchronized { @pgmq.create_fifo_index(full_name) } end end |
#create_fifo_indexes_all ⇒ Object
393 394 395 396 397 |
# File 'lib/pgbus/client.rb', line 393 def create_fifo_indexes_all with_stale_connection_retry do synchronized { @pgmq.create_fifo_indexes_all } end end |
#delete_batch(queue_name, msg_ids, prefixed: true) ⇒ Object
Batch delete — permanently removes multiple messages in one call.
219 220 221 222 223 224 |
# File 'lib/pgbus/client.rb', line 219 def delete_batch(queue_name, msg_ids, prefixed: true) name = prefixed ? config.queue_name(queue_name) : queue_name with_stale_connection_retry do synchronized { @pgmq.delete_batch(name, msg_ids) } end end |
#delete_message(queue_name, msg_id, prefixed: true) ⇒ Object
Delete a message. Pass prefixed: false when queue_name is already the full PGMQ queue name (e.g. from priority sub-queues or dashboard).
194 195 196 197 198 199 |
# File 'lib/pgbus/client.rb', line 194 def (queue_name, msg_id, prefixed: true) name = prefixed ? config.queue_name(queue_name) : queue_name with_stale_connection_retry do synchronized { @pgmq.delete(name, msg_id) } end end |
#drop_queue(queue_name, prefixed: true) ⇒ Object
284 285 286 287 288 289 290 291 |
# File 'lib/pgbus/client.rb', line 284 def drop_queue(queue_name, prefixed: true) name = prefixed ? config.queue_name(queue_name) : queue_name result = with_stale_connection_retry do synchronized { @pgmq.drop_queue(name) } end @queues_created.delete(name) result end |
#ensure_all_queues ⇒ Object
70 71 72 73 74 |
# File 'lib/pgbus/client.rb', line 70 def ensure_all_queues queue_names = collect_configured_queues Pgbus.logger.info { "[Pgbus] Bootstrapping #{queue_names.size} queue(s): #{queue_names.join(", ")}" } queue_names.each { |name| ensure_queue(name) } end |
#ensure_dead_letter_queue(name) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/pgbus/client.rb', line 76 def ensure_dead_letter_queue(name) dlq_name = config.dead_letter_queue_name(name) return if @queues_created[dlq_name] @queues_created.compute_if_absent(dlq_name) do synchronized do @pgmq.create(dlq_name) tune_autovacuum(dlq_name) end true end end |
#ensure_queue(name) ⇒ Object
65 66 67 68 |
# File 'lib/pgbus/client.rb', line 65 def ensure_queue(name) ensure_pgmq_schema @queue_strategy.physical_queue_names(name).each { |pq| ensure_single_queue(pq) } end |
#list_notify_insert_throttles ⇒ Object
415 416 417 418 419 |
# File 'lib/pgbus/client.rb', line 415 def list_notify_insert_throttles with_stale_connection_retry do synchronized { @pgmq.list_notify_insert_throttles } end end |
#list_queues ⇒ Object
271 272 273 274 275 |
# File 'lib/pgbus/client.rb', line 271 def list_queues with_stale_connection_retry do synchronized { @pgmq.list_queues } end end |
#message_exists?(queue_name, msg_id: nil, uniqueness_key: nil) ⇒ Boolean
Check whether a message exists in the given queue.
Pass either msg_id for a fast primary-key lookup, or uniqueness_key to scan the queue for any message whose payload carries that key in the pgbus_uniqueness_key JSONB field. The latter is used by the dispatcher reaper to determine if a uniqueness lock with msg_id=0 (placeholder) still has a corresponding queue message.
queue_name may be either a logical name (e.g. “default”) or an already prefixed physical name (e.g. “pgbus_default”). The client normalizes both.
Returns:
true — the message definitely exists in the queue
false — the message definitely does not exist
nil — could not determine (e.g. queue table missing or unknown error).
Callers MUST treat nil as "exists" for safety.
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/pgbus/client.rb', line 309 def (queue_name, msg_id: nil, uniqueness_key: nil) has_msg_id = !msg_id.nil? has_uniqueness_key = !uniqueness_key.nil? raise ArgumentError, "pass exactly one of msg_id or uniqueness_key" unless has_msg_id ^ has_uniqueness_key full_name = resolve_full_queue_name(queue_name) sanitized = QueueNameValidator.sanitize!(full_name) synchronized do with_raw_connection do |conn| if has_msg_id msg_id_present?(conn, sanitized, msg_id.to_i) else uniqueness_key_present?(conn, sanitized, uniqueness_key) end end end rescue ActiveRecord::StatementInvalid => e raise unless undefined_table_error?(e) nil rescue StandardError => e raise unless defined?(PG::UndefinedTable) && e.is_a?(PG::UndefinedTable) nil end |
#metrics(queue_name = nil) ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/pgbus/client.rb', line 259 def metrics(queue_name = nil) with_stale_connection_retry do synchronized do if queue_name @pgmq.metrics(config.queue_name(queue_name)) else @pgmq.metrics_all end end end end |
#move_to_dead_letter(queue_name, message) ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/pgbus/client.rb', line 244 def move_to_dead_letter(queue_name, ) dlq_name = config.dead_letter_queue_name(queue_name) full_queue = config.queue_name(queue_name) with_stale_connection_retry do ensure_dead_letter_queue(queue_name) synchronized do @pgmq.transaction do |txn| txn.produce(dlq_name, ., headers: .headers) txn.delete(full_queue, .msg_id.to_i) end end end end |
#publish_to_topic(routing_key, payload, headers: nil, delay: 0) ⇒ Object
447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'lib/pgbus/client.rb', line 447 def publish_to_topic(routing_key, payload, headers: nil, delay: 0) with_stale_connection_retry do synchronized do @pgmq.produce_topic( routing_key, serialize(payload), headers: headers && serialize(headers), delay: delay ) end end end |
#purge_archive(queue_name, older_than:, batch_size: 1000) ⇒ Object
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/pgbus/client.rb', line 336 def purge_archive(queue_name, older_than:, batch_size: 1000) full_name = config.queue_name(queue_name) sanitized = QueueNameValidator.sanitize!(full_name) total = 0 sql = "DELETE FROM pgmq.a_#{sanitized} " \ "WHERE ctid = ANY(ARRAY(SELECT ctid FROM pgmq.a_#{sanitized} WHERE enqueued_at < $1 LIMIT $2))" loop do deleted = synchronized do with_raw_connection do |conn| conn.exec_params(sql, [older_than, batch_size]).cmd_tuples end end total += deleted break if deleted < batch_size end total end |
#purge_queue(queue_name, prefixed: true) ⇒ Object
277 278 279 280 281 282 |
# File 'lib/pgbus/client.rb', line 277 def purge_queue(queue_name, prefixed: true) name = prefixed ? config.queue_name(queue_name) : queue_name with_stale_connection_retry do synchronized { @pgmq.purge_queue(name) } end end |
#read_batch(queue_name, qty:, vt: nil) ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/pgbus/client.rb', line 119 def read_batch(queue_name, qty:, vt: nil) full_name = config.queue_name(queue_name) Instrumentation.instrument("pgbus.client.read_batch", queue: full_name, qty: qty) do with_stale_connection_retry do synchronized { with_read_timeout { @pgmq.read_batch(full_name, vt: vt || config.visibility_timeout, qty: qty) } } end end end |
#read_batch_prioritized(queue_name, qty:, vt: nil) ⇒ Object
Read from priority sub-queues, highest priority (p0) first. Returns [priority_queue_name, messages] pairs.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/pgbus/client.rb', line 130 def read_batch_prioritized(queue_name, qty:, vt: nil) unless @queue_strategy.priority? return (read_batch(queue_name, qty: qty, vt: vt) || []).map do |m| [config.queue_name(queue_name), m] end end remaining = qty results = [] config.priority_queue_names(queue_name).each do |pq_name| break if remaining <= 0 msgs = Instrumentation.instrument("pgbus.client.read_batch", queue: pq_name, qty: remaining) do with_stale_connection_retry do synchronized { with_read_timeout { @pgmq.read_batch(pq_name, vt: vt || config.visibility_timeout, qty: remaining) } } end end || [] msgs.each { |m| results << [pq_name, m] } remaining -= msgs.size end results end |
#read_grouped(queue_name, qty:, vt: nil) ⇒ Object
— Grouped reads (PGMQ v1.11.0+) —
359 360 361 362 363 364 365 366 |
# File 'lib/pgbus/client.rb', line 359 def read_grouped(queue_name, qty:, vt: nil) full_name = config.queue_name(queue_name) Instrumentation.instrument("pgbus.client.read_grouped", queue: full_name, qty: qty) do with_stale_connection_retry do synchronized { with_read_timeout { @pgmq.read_grouped(full_name, vt: vt || config.visibility_timeout, qty: qty) } } end end end |
#read_grouped_head(queue_name, qty:, vt: nil) ⇒ Object
377 378 379 380 381 382 |
# File 'lib/pgbus/client.rb', line 377 def read_grouped_head(queue_name, qty:, vt: nil) full_name = config.queue_name(queue_name) with_stale_connection_retry do synchronized { with_read_timeout { @pgmq.read_grouped_head(full_name, vt: vt || config.visibility_timeout, qty: qty) } } end end |
#read_grouped_rr(queue_name, qty:, vt: nil) ⇒ Object
368 369 370 371 372 373 374 375 |
# File 'lib/pgbus/client.rb', line 368 def read_grouped_rr(queue_name, qty:, vt: nil) full_name = config.queue_name(queue_name) Instrumentation.instrument("pgbus.client.read_grouped_rr", queue: full_name, qty: qty) do with_stale_connection_retry do synchronized { with_read_timeout { @pgmq.read_grouped_rr(full_name, vt: vt || config.visibility_timeout, qty: qty) } } end end end |
#read_message(queue_name, vt: nil) ⇒ Object
110 111 112 113 114 115 116 117 |
# File 'lib/pgbus/client.rb', line 110 def (queue_name, vt: nil) full_name = config.queue_name(queue_name) Instrumentation.instrument("pgbus.client.read_message", queue: full_name) do with_stale_connection_retry do synchronized { with_read_timeout { @pgmq.read(full_name, vt: vt || config.visibility_timeout) } } end end end |
#read_multi(queue_names, qty:, vt: nil, limit: nil) ⇒ Object
Read from multiple queues in a single SQL query (UNION ALL). Each returned message includes a queue_name field identifying its source. queue_names should be logical names (prefix is added automatically).
‘qty` is the per-queue cap (pgmq-ruby semantics), so without `limit:` the caller receives up to `queue_count * qty` messages. Pass `limit:` to cap the total across all queues — required when feeding a fixed-size pool, otherwise the pool can overflow on multi-queue reads (issue #123).
179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/pgbus/client.rb', line 179 def read_multi(queue_names, qty:, vt: nil, limit: nil) full_names = queue_names.map { |q| config.queue_name(q) } Instrumentation.instrument("pgbus.client.read_multi", queues: full_names, qty: qty, limit: limit) do with_stale_connection_retry do synchronized do with_read_timeout do @pgmq.read_multi(full_names, vt: vt || config.visibility_timeout, qty: qty, limit: limit) end end end end end |
#read_with_poll(queue_name, qty:, vt: nil, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/pgbus/client.rb', line 156 def read_with_poll(queue_name, qty:, vt: nil, max_poll_seconds: 5, poll_interval_ms: 100) full_name = config.queue_name(queue_name) with_stale_connection_retry do synchronized do @pgmq.read_with_poll( full_name, vt: vt || config.visibility_timeout, qty: qty, max_poll_seconds: max_poll_seconds, poll_interval_ms: poll_interval_ms ) end end end |
#send_batch(queue_name, payloads, headers: nil, delay: 0) ⇒ Object
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/pgbus/client.rb', line 99 def send_batch(queue_name, payloads, headers: nil, delay: 0) full_name = config.queue_name(queue_name) serialized, serialized_headers = serialize_batch(payloads, headers) Instrumentation.instrument("pgbus.client.send_batch", queue: full_name, size: payloads.size) do with_stale_connection_retry do ensure_queue(queue_name) synchronized { @pgmq.produce_batch(full_name, serialized, headers: serialized_headers, delay: delay) } end end end |
#send_message(queue_name, payload, headers: nil, delay: 0, priority: nil) ⇒ Object
89 90 91 92 93 94 95 96 97 |
# File 'lib/pgbus/client.rb', line 89 def (queue_name, payload, headers: nil, delay: 0, priority: nil) target = @queue_strategy.target_queue(queue_name, priority) Instrumentation.instrument("pgbus.client.send_message", queue: target) do with_stale_connection_retry do ensure_queue(queue_name) synchronized { @pgmq.produce(target, serialize(payload), headers: headers && serialize(headers), delay: delay) } end end end |
#set_visibility_timeout(queue_name, msg_id, vt:, prefixed: true) ⇒ Object
Set visibility timeout. Pass prefixed: false when queue_name is already the full PGMQ queue name.
228 229 230 231 232 233 |
# File 'lib/pgbus/client.rb', line 228 def set_visibility_timeout(queue_name, msg_id, vt:, prefixed: true) name = prefixed ? config.queue_name(queue_name) : queue_name with_stale_connection_retry do synchronized { @pgmq.set_vt(name, msg_id, vt: vt) } end end |
#transaction(&block) ⇒ Object
Open a PGMQ transaction. The caller block may run twice if the first attempt hits a pre-flight stale-connection error — safe because no SQL was sent on the first attempt (the connection was dead before the BEGIN).
238 239 240 241 242 |
# File 'lib/pgbus/client.rb', line 238 def transaction(&block) with_stale_connection_retry do synchronized { @pgmq.transaction(&block) } end end |
#update_notify_insert(queue_name, throttle_interval_ms:) ⇒ Object
408 409 410 411 412 413 |
# File 'lib/pgbus/client.rb', line 408 def update_notify_insert(queue_name, throttle_interval_ms:) full_name = config.queue_name(queue_name) with_stale_connection_retry do synchronized { @pgmq.update_notify_insert(full_name, throttle_interval_ms: throttle_interval_ms) } end end |
#wait_for_notify(queue_name, timeout: nil, &block) ⇒ Object
— LISTEN/NOTIFY management (PGMQ v1.11.0+) —
401 402 403 404 405 406 |
# File 'lib/pgbus/client.rb', line 401 def wait_for_notify(queue_name, timeout: nil, &block) full_name = config.queue_name(queue_name) with_stale_connection_retry do synchronized { @pgmq.wait_for_notify(full_name, timeout: timeout, &block) } end end |