Module: PGMQ::Client::MessageLifecycle

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/message_lifecycle.rb

Overview

Message lifecycle operations (pop, delete, archive, visibility timeout)

This module handles message state transitions including popping (atomic read+delete), deleting, archiving, and updating visibility timeouts.

Instance Method Summary collapse

Instance Method Details

#archive(queue_name, msg_id) ⇒ Boolean

Archives a message

Examples:

client.archive("orders", 123)

Parameters:

  • queue_name (String)

    name of the queue

  • msg_id (Integer)

    message ID to archive

Returns:

  • (Boolean)

    true if message was archived



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/pgmq/client/message_lifecycle.rb', line 152

def archive(
  queue_name,
  msg_id
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.archive($1::text, $2::bigint)",
      [queue_name, msg_id]
    )
  end

  return false if result.ntuples.zero?

  result[0]["archive"] == "t"
end

#archive_batch(queue_name, msg_ids) ⇒ Array<Integer>

Archives multiple messages

Examples:

archived = client.archive_batch("orders", [101, 102, 103])

Parameters:

  • queue_name (String)

    name of the queue

  • msg_ids (Array<Integer>)

    array of message IDs to archive

Returns:

  • (Array<Integer>)

    array of successfully archived message IDs



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/pgmq/client/message_lifecycle.rb', line 178

def archive_batch(
  queue_name,
  msg_ids
)
  validate_queue_name!(queue_name)
  return [] if msg_ids.empty?

  # Use PostgreSQL array parameter binding
  result = with_connection do |conn|
    encoder = PG::TextEncoder::Array.new
    encoded_array = encoder.encode(msg_ids)

    conn.exec_params(
      "SELECT * FROM pgmq.archive($1::text, $2::bigint[])",
      [queue_name, encoded_array]
    )
  end

  result.map { |row| row["archive"] }
end

#archive_multi(archives) ⇒ Hash

Archives specific messages from multiple queues in a single transaction

Efficiently archives messages across different queues atomically.

Examples:

Archive messages from multiple queues

client.archive_multi({
  'orders' => [1, 2],
  'notifications' => [5]
})

Parameters:

  • archives (Hash)

    hash of queue_name => array of msg_ids

Returns:

  • (Hash)

    hash of queue_name => array of archived msg_ids

Raises:

  • (ArgumentError)


211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/pgmq/client/message_lifecycle.rb', line 211

def archive_multi(archives)
  raise ArgumentError, "archives must be a hash" unless archives.is_a?(Hash)
  return {} if archives.empty?

  # Validate all queue names
  archives.each_key { |qn| validate_queue_name!(qn) }

  transaction do |txn|
    result = {}
    archives.each do |queue_name, msg_ids|
      next if msg_ids.empty?

      archived_ids = txn.archive_batch(queue_name, msg_ids)
      result[queue_name] = archived_ids
    end
    result
  end
end

#delete(queue_name, msg_id) ⇒ Boolean

Deletes a message from the queue

Examples:

client.delete("orders", 123)

Parameters:

  • queue_name (String)

    name of the queue

  • msg_id (Integer)

    message ID to delete

Returns:

  • (Boolean)

    true if message was deleted



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/pgmq/client/message_lifecycle.rb', line 58

def delete(
  queue_name,
  msg_id
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.delete($1::text, $2::bigint)",
      [queue_name, msg_id]
    )
  end

  return false if result.ntuples.zero?

  result[0]["delete"] == "t"
end

#delete_batch(queue_name, msg_ids) ⇒ Array<Integer>

Deletes multiple messages from the queue

Examples:

deleted = client.delete_batch("orders", [101, 102, 103])

Parameters:

  • queue_name (String)

    name of the queue

  • msg_ids (Array<Integer>)

    array of message IDs to delete

Returns:

  • (Array<Integer>)

    array of successfully deleted message IDs



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/pgmq/client/message_lifecycle.rb', line 84

def delete_batch(
  queue_name,
  msg_ids
)
  validate_queue_name!(queue_name)
  return [] if msg_ids.empty?

  # Use PostgreSQL array parameter binding
  result = with_connection do |conn|
    encoder = PG::TextEncoder::Array.new
    encoded_array = encoder.encode(msg_ids)

    conn.exec_params(
      "SELECT * FROM pgmq.delete($1::text, $2::bigint[])",
      [queue_name, encoded_array]
    )
  end

  result.map { |row| row["delete"] }
end

#delete_multi(deletions) ⇒ Hash

Deletes specific messages from multiple queues in a single transaction

Efficiently deletes messages across different queues atomically. Useful when processing related messages from different queues.

Examples:

Delete messages from multiple queues

client.delete_multi({
  'orders' => [1, 2, 3],
  'notifications' => [5, 6],
  'emails' => [10]
})
# => { 'orders' => [1, 2, 3], 'notifications' => [5, 6], 'emails' => [10] }

Clean up after batch processing across queues

messages = client.read_multi(['q1', 'q2', 'q3'], qty: 10)
deletions = messages.group_by(&:queue_name).transform_values { |mss| mss.map(&:msg_id) }
client.delete_multi(deletions)

Parameters:

  • deletions (Hash)

    hash of queue_name => array of msg_ids

Returns:

  • (Hash)

    hash of queue_name => array of deleted msg_ids

Raises:

  • (ArgumentError)


125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/pgmq/client/message_lifecycle.rb', line 125

def delete_multi(deletions)
  raise ArgumentError, "deletions must be a hash" unless deletions.is_a?(Hash)
  return {} if deletions.empty?

  # Validate all queue names
  deletions.each_key { |qn| validate_queue_name!(qn) }

  transaction do |txn|
    result = {}
    deletions.each do |queue_name, msg_ids|
      next if msg_ids.empty?

      deleted_ids = txn.delete_batch(queue_name, msg_ids)
      result[queue_name] = deleted_ids
    end
    result
  end
end

#pop(queue_name) ⇒ PGMQ::Message?

Pops a message (atomic read + delete)

Examples:

msg = client.pop("orders")
process(msg.payload) if msg

Parameters:

  • queue_name (String)

    name of the queue

Returns:

  • (PGMQ::Message, nil)

    message object or nil if queue is empty



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/pgmq/client/message_lifecycle.rb', line 18

def pop(queue_name)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params("SELECT * FROM pgmq.pop($1::text)", [queue_name])
  end

  return nil if result.ntuples.zero?

  Message.new(result[0])
end

#pop_batch(queue_name, qty) ⇒ Array<PGMQ::Message>

Pops multiple messages atomically (atomic read + delete for batch)

Examples:

Pop up to 10 messages

messages = client.pop_batch("orders", 10)
messages.each { |msg| process(msg.payload) }

Parameters:

  • queue_name (String)

    name of the queue

  • qty (Integer)

    maximum number of messages to pop

Returns:

  • (Array<PGMQ::Message>)

    array of message objects (empty if queue is empty)



39
40
41
42
43
44
45
46
47
48
# File 'lib/pgmq/client/message_lifecycle.rb', line 39

def pop_batch(queue_name, qty)
  validate_queue_name!(queue_name)
  return [] if qty <= 0

  result = with_connection do |conn|
    conn.exec_params("SELECT * FROM pgmq.pop($1::text, $2::integer)", [queue_name, qty])
  end

  result.map { |row| Message.new(row) }
end

#set_vt(queue_name, msg_id, vt:) ⇒ PGMQ::Message?

Updates the visibility timeout for a message

Supports two modes:

  • Integer offset (seconds from now): ‘vt: 60` - message visible in 60 seconds

  • Absolute timestamp: ‘vt: Time.now + 300` - message visible at specific time

Examples:

Extend processing time by 60 more seconds (offset)

msg = client.set_vt("orders", 123, vt: 60)

Set absolute visibility time (timestamp)

msg = client.set_vt("orders", 123, vt: Time.now + 300)

Parameters:

  • queue_name (String)

    name of the queue

  • msg_id (Integer)

    message ID

  • vt (Integer, Time)

    visibility timeout as seconds offset or absolute timestamp

Returns:



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/pgmq/client/message_lifecycle.rb', line 246

def set_vt(queue_name, msg_id, vt:)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    if vt.is_a?(Time)
      conn.exec_params(
        "SELECT * FROM pgmq.set_vt($1::text, $2::bigint, $3::timestamptz)",
        [queue_name, msg_id, vt.utc.iso8601(6)]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.set_vt($1::text, $2::bigint, $3::integer)",
        [queue_name, msg_id, vt]
      )
    end
  end

  return nil if result.ntuples.zero?

  Message.new(result[0])
end

#set_vt_batch(queue_name, msg_ids, vt:) ⇒ Array<PGMQ::Message>

Updates visibility timeout for multiple messages

Supports two modes:

  • Integer offset (seconds from now): ‘vt: 60` - messages visible in 60 seconds

  • Absolute timestamp: ‘vt: Time.now + 300` - messages visible at specific time

Examples:

Extend processing time for multiple messages (offset)

messages = client.set_vt_batch("orders", [101, 102, 103], vt: 60)

Set absolute visibility time (timestamp)

messages = client.set_vt_batch("orders", [101, 102], vt: Time.now + 300)

Parameters:

  • queue_name (String)

    name of the queue

  • msg_ids (Array<Integer>)

    array of message IDs

  • vt (Integer, Time)

    visibility timeout as seconds offset or absolute timestamp

Returns:



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/pgmq/client/message_lifecycle.rb', line 284

def set_vt_batch(queue_name, msg_ids, vt:)
  validate_queue_name!(queue_name)
  return [] if msg_ids.empty?

  result = with_connection do |conn|
    encoder = PG::TextEncoder::Array.new
    encoded_array = encoder.encode(msg_ids)

    if vt.is_a?(Time)
      conn.exec_params(
        "SELECT * FROM pgmq.set_vt($1::text, $2::bigint[], $3::timestamptz)",
        [queue_name, encoded_array, vt.utc.iso8601(6)]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.set_vt($1::text, $2::bigint[], $3::integer)",
        [queue_name, encoded_array, vt]
      )
    end
  end

  result.map { |row| Message.new(row) }
end

#set_vt_multi(updates, vt:) ⇒ Hash

Updates visibility timeout for messages across multiple queues in a single transaction

Efficiently updates visibility timeouts across different queues atomically. Useful when processing related messages from different queues and needing to extend their visibility timeouts together.

Supports two modes:

  • Integer offset (seconds from now): ‘vt: 60` - messages visible in 60 seconds

  • Absolute timestamp: ‘vt: Time.now + 300` - messages visible at specific time

Examples:

Extend visibility timeout for messages from multiple queues

client.set_vt_multi({
  'orders' => [1, 2, 3],
  'notifications' => [5, 6],
  'emails' => [10]
}, vt: 60)
# => { 'orders' => [<Message>, ...], 'notifications' => [...], 'emails' => [...] }

Set absolute visibility time

client.set_vt_multi(updates, vt: Time.now + 300)

Extend timeout after batch reading from multiple queues

messages = client.read_multi(['q1', 'q2', 'q3'], qty: 10)
updates = messages.group_by(&:queue_name).transform_values { |msgs| msgs.map(&:msg_id) }
client.set_vt_multi(updates, vt: 120)

Parameters:

  • updates (Hash)

    hash of queue_name => array of msg_ids

  • vt (Integer, Time)

    visibility timeout as seconds offset or absolute timestamp

Returns:

  • (Hash)

    hash of queue_name => array of updated PGMQ::Message objects

Raises:

  • (ArgumentError)


337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/pgmq/client/message_lifecycle.rb', line 337

def set_vt_multi(updates, vt:)
  raise ArgumentError, "updates must be a hash" unless updates.is_a?(Hash)
  return {} if updates.empty?

  # Validate all queue names
  updates.each_key { |qn| validate_queue_name!(qn) }

  transaction do |txn|
    result = {}
    updates.each do |queue_name, msg_ids|
      next if msg_ids.empty?

      updated_messages = txn.set_vt_batch(queue_name, msg_ids, vt: vt)
      result[queue_name] = updated_messages
    end
    result
  end
end