Module: PGMQ::Client::MessageLifecycle
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/message_lifecycle.rb
Overview
Message lifecycle operations (pop, delete, archive, visibility timeout)
This module handles message state transitions including popping (atomic read+delete), deleting, archiving, and updating visibility timeouts.
Instance Method Summary collapse
-
#archive(queue_name, msg_id) ⇒ Boolean
Archives a message.
-
#archive_batch(queue_name, msg_ids) ⇒ Array<Integer>
Archives multiple messages.
-
#archive_multi(archives) ⇒ Hash
Archives specific messages from multiple queues in a single transaction.
-
#delete(queue_name, msg_id) ⇒ Boolean
Deletes a message from the queue.
-
#delete_batch(queue_name, msg_ids) ⇒ Array<Integer>
Deletes multiple messages from the queue.
-
#delete_multi(deletions) ⇒ Hash
Deletes specific messages from multiple queues in a single transaction.
-
#pop(queue_name) ⇒ PGMQ::Message?
Pops a message (atomic read + delete).
-
#pop_batch(queue_name, qty) ⇒ Array<PGMQ::Message>
Pops multiple messages atomically (atomic read + delete for batch).
-
#set_vt(queue_name, msg_id, vt:) ⇒ PGMQ::Message?
Updates the visibility timeout for a message.
-
#set_vt_batch(queue_name, msg_ids, vt:) ⇒ Array<PGMQ::Message>
Updates visibility timeout for multiple messages.
-
#set_vt_multi(updates, vt:) ⇒ Hash
Updates visibility timeout for messages across multiple queues in a single transaction.
Instance Method Details
#archive(queue_name, msg_id) ⇒ Boolean
Archives a message
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 152 def archive( queue_name, msg_id ) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT pgmq.archive($1::text, $2::bigint)", [queue_name, msg_id] ) end return false if result.ntuples.zero? result[0]["archive"] == "t" end |
#archive_batch(queue_name, msg_ids) ⇒ Array<Integer>
Archives multiple messages
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 178 def archive_batch( queue_name, msg_ids ) validate_queue_name!(queue_name) return [] if msg_ids.empty? # Use PostgreSQL array parameter binding result = with_connection do |conn| encoder = PG::TextEncoder::Array.new encoded_array = encoder.encode(msg_ids) conn.exec_params( "SELECT * FROM pgmq.archive($1::text, $2::bigint[])", [queue_name, encoded_array] ) end result.map { |row| row["archive"] } end |
#archive_multi(archives) ⇒ Hash
Archives specific messages from multiple queues in a single transaction
Efficiently archives messages across different queues atomically.
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 211 def archive_multi(archives) raise ArgumentError, "archives must be a hash" unless archives.is_a?(Hash) return {} if archives.empty? # Validate all queue names archives.each_key { |qn| validate_queue_name!(qn) } transaction do |txn| result = {} archives.each do |queue_name, msg_ids| next if msg_ids.empty? archived_ids = txn.archive_batch(queue_name, msg_ids) result[queue_name] = archived_ids end result end end |
#delete(queue_name, msg_id) ⇒ Boolean
Deletes a message from the queue
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 58 def delete( queue_name, msg_id ) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT pgmq.delete($1::text, $2::bigint)", [queue_name, msg_id] ) end return false if result.ntuples.zero? result[0]["delete"] == "t" end |
#delete_batch(queue_name, msg_ids) ⇒ Array<Integer>
Deletes multiple messages from the queue
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 84 def delete_batch( queue_name, msg_ids ) validate_queue_name!(queue_name) return [] if msg_ids.empty? # Use PostgreSQL array parameter binding result = with_connection do |conn| encoder = PG::TextEncoder::Array.new encoded_array = encoder.encode(msg_ids) conn.exec_params( "SELECT * FROM pgmq.delete($1::text, $2::bigint[])", [queue_name, encoded_array] ) end result.map { |row| row["delete"] } end |
#delete_multi(deletions) ⇒ Hash
Deletes specific messages from multiple queues in a single transaction
Efficiently deletes messages across different queues atomically. Useful when processing related messages from different queues.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 125 def delete_multi(deletions) raise ArgumentError, "deletions must be a hash" unless deletions.is_a?(Hash) return {} if deletions.empty? # Validate all queue names deletions.each_key { |qn| validate_queue_name!(qn) } transaction do |txn| result = {} deletions.each do |queue_name, msg_ids| next if msg_ids.empty? deleted_ids = txn.delete_batch(queue_name, msg_ids) result[queue_name] = deleted_ids end result end end |
#pop(queue_name) ⇒ PGMQ::Message?
Pops a message (atomic read + delete)
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 18 def pop(queue_name) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params("SELECT * FROM pgmq.pop($1::text)", [queue_name]) end return nil if result.ntuples.zero? Message.new(result[0]) end |
#pop_batch(queue_name, qty) ⇒ Array<PGMQ::Message>
Pops multiple messages atomically (atomic read + delete for batch)
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 39 def pop_batch(queue_name, qty) validate_queue_name!(queue_name) return [] if qty <= 0 result = with_connection do |conn| conn.exec_params("SELECT * FROM pgmq.pop($1::text, $2::integer)", [queue_name, qty]) end result.map { |row| Message.new(row) } end |
#set_vt(queue_name, msg_id, vt:) ⇒ PGMQ::Message?
Updates the visibility timeout for a message
Supports two modes:
-
Integer offset (seconds from now): ‘vt: 60` - message visible in 60 seconds
-
Absolute timestamp: ‘vt: Time.now + 300` - message visible at specific time
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 246 def set_vt(queue_name, msg_id, vt:) validate_queue_name!(queue_name) result = with_connection do |conn| if vt.is_a?(Time) conn.exec_params( "SELECT * FROM pgmq.set_vt($1::text, $2::bigint, $3::timestamptz)", [queue_name, msg_id, vt.utc.iso8601(6)] ) else conn.exec_params( "SELECT * FROM pgmq.set_vt($1::text, $2::bigint, $3::integer)", [queue_name, msg_id, vt] ) end end return nil if result.ntuples.zero? Message.new(result[0]) end |
#set_vt_batch(queue_name, msg_ids, vt:) ⇒ Array<PGMQ::Message>
Updates visibility timeout for multiple messages
Supports two modes:
-
Integer offset (seconds from now): ‘vt: 60` - messages visible in 60 seconds
-
Absolute timestamp: ‘vt: Time.now + 300` - messages visible at specific time
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 284 def set_vt_batch(queue_name, msg_ids, vt:) validate_queue_name!(queue_name) return [] if msg_ids.empty? result = with_connection do |conn| encoder = PG::TextEncoder::Array.new encoded_array = encoder.encode(msg_ids) if vt.is_a?(Time) conn.exec_params( "SELECT * FROM pgmq.set_vt($1::text, $2::bigint[], $3::timestamptz)", [queue_name, encoded_array, vt.utc.iso8601(6)] ) else conn.exec_params( "SELECT * FROM pgmq.set_vt($1::text, $2::bigint[], $3::integer)", [queue_name, encoded_array, vt] ) end end result.map { |row| Message.new(row) } end |
#set_vt_multi(updates, vt:) ⇒ Hash
Updates visibility timeout for messages across multiple queues in a single transaction
Efficiently updates visibility timeouts across different queues atomically. Useful when processing related messages from different queues and needing to extend their visibility timeouts together.
Supports two modes:
-
Integer offset (seconds from now): ‘vt: 60` - messages visible in 60 seconds
-
Absolute timestamp: ‘vt: Time.now + 300` - messages visible at specific time
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/pgmq/client/message_lifecycle.rb', line 337 def set_vt_multi(updates, vt:) raise ArgumentError, "updates must be a hash" unless updates.is_a?(Hash) return {} if updates.empty? # Validate all queue names updates.each_key { |qn| validate_queue_name!(qn) } transaction do |txn| result = {} updates.each do |queue_name, msg_ids| next if msg_ids.empty? = txn.set_vt_batch(queue_name, msg_ids, vt: vt) result[queue_name] = end result end end |