Module: PGMQ::Client::Consumer

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/consumer.rb

Overview

Single-queue message reading operations

This module handles reading messages from a single queue, including basic reads, batch reads, and long-polling for efficient message consumption.

Instance Method Summary collapse

Instance Method Details

#read(queue_name, vt: DEFAULT_VT, conditional: {}) ⇒ PGMQ::Message?

Reads a message from the queue

Examples:

msg = client.read("orders", vt: 30)
if msg
  process(msg.payload)
  client.delete("orders", msg.msg_id)
end

With conditional filtering

msg = client.read("orders", vt: 30, conditional: { type: "priority", status: "pending" })

Parameters:

  • queue_name (String)

    name of the queue

  • vt (Integer) (defaults to: DEFAULT_VT)

    visibility timeout in seconds

  • conditional (Hash) (defaults to: {})

    optional JSONB filter for message payload

Returns:

  • (PGMQ::Message, nil)

    message object or nil if queue is empty



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/pgmq/client/consumer.rb', line 26

def read(
  queue_name,
  vt: DEFAULT_VT,
  conditional: {}
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    if conditional.empty?
      conn.exec_params(
        "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer)",
        [queue_name, vt, 1]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer, $4::jsonb)",
        [queue_name, vt, 1, conditional.to_json]
      )
    end
  end

  return nil if result.ntuples.zero?

  Message.new(result[0])
end

#read_batch(queue_name, vt: DEFAULT_VT, qty: 1, conditional: {}) ⇒ Array<PGMQ::Message>

Reads multiple messages from the queue

Examples:

messages = client.read_batch("orders", vt: 30, qty: 10)
messages.each do |msg|
  process(msg.payload)
  client.delete("orders", msg.msg_id)
end

With conditional filtering

messages = client.read_batch(
  "orders",
  vt: 30,
  qty: 10,
  conditional: { priority: "high" }
)

Parameters:

  • queue_name (String)

    name of the queue

  • vt (Integer) (defaults to: DEFAULT_VT)

    visibility timeout in seconds

  • qty (Integer) (defaults to: 1)

    number of messages to read

  • conditional (Hash) (defaults to: {})

    optional JSONB filter for message payload

Returns:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/pgmq/client/consumer.rb', line 74

def read_batch(
  queue_name,
  vt: DEFAULT_VT,
  qty: 1,
  conditional: {}
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    if conditional.empty?
      conn.exec_params(
        "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer)",
        [queue_name, vt, qty]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer, $4::jsonb)",
        [queue_name, vt, qty, conditional.to_json]
      )
    end
  end

  result.map { |row| Message.new(row) }
end

#read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>

Reads messages using grouped round-robin ordering

Messages are grouped by the first key in their JSON payload and returned in round-robin order across groups. This ensures fair processing when messages from different entities (users, orders, etc.) are in the queue.

Examples:

Fair processing across users

# Queue contains: user1_msg1, user1_msg2, user2_msg1, user3_msg1
messages = client.read_grouped_rr("tasks", vt: 30, qty: 4)
# Returns in round-robin: user1_msg1, user2_msg1, user3_msg1, user1_msg2

Prevent single entity from monopolizing worker

loop do
  messages = client.read_grouped_rr("orders", vt: 30, qty: 10)
  break if messages.empty?
  messages.each { |msg| process(msg) }
end

Parameters:

  • queue_name (String)

    name of the queue

  • vt (Integer) (defaults to: DEFAULT_VT)

    visibility timeout in seconds

  • qty (Integer) (defaults to: 1)

    number of messages to read

Returns:

  • (Array<PGMQ::Message>)

    array of messages in round-robin order



176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/pgmq/client/consumer.rb', line 176

def read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params(
      "SELECT * FROM pgmq.read_grouped_rr($1::text, $2::integer, $3::integer)",
      [queue_name, vt, qty]
    )
  end

  result.map { |row| Message.new(row) }
end

#read_grouped_rr_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>

Reads messages using grouped round-robin with long-polling support

Combines grouped round-robin ordering with long-polling for efficient and fair message consumption.

Examples:

Long-polling with fair ordering

messages = client.read_grouped_rr_with_poll("tasks",
  vt: 30,
  qty: 10,
  max_poll_seconds: 5,
  poll_interval_ms: 100
)

Parameters:

  • queue_name (String)

    name of the queue

  • vt (Integer) (defaults to: DEFAULT_VT)

    visibility timeout in seconds

  • qty (Integer) (defaults to: 1)

    number of messages to read

  • max_poll_seconds (Integer) (defaults to: 5)

    maximum time to poll in seconds

  • poll_interval_ms (Integer) (defaults to: 100)

    interval between polls in milliseconds

Returns:

  • (Array<PGMQ::Message>)

    array of messages in round-robin order



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/pgmq/client/consumer.rb', line 208

def read_grouped_rr_with_poll(
  queue_name,
  vt: DEFAULT_VT,
  qty: 1,
  max_poll_seconds: 5,
  poll_interval_ms: 100
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params(
      "SELECT * FROM pgmq.read_grouped_rr_with_poll($1::text, $2::integer, $3::integer, $4::integer, $5::integer)",
      [queue_name, vt, qty, max_poll_seconds, poll_interval_ms]
    )
  end

  result.map { |row| Message.new(row) }
end

#read_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100, conditional: {}) ⇒ Array<PGMQ::Message>

Reads messages with long-polling support

Polls the queue for messages, waiting up to max_poll_seconds if queue is empty

Examples:

messages = client.read_with_poll("orders",
  vt: 30,
  qty: 5,
  max_poll_seconds: 10,
  poll_interval_ms: 250
)

With conditional filtering

messages = client.read_with_poll("orders",
  vt: 30,
  qty: 5,
  conditional: { status: "pending" }
)

Parameters:

  • queue_name (String)

    name of the queue

  • vt (Integer) (defaults to: DEFAULT_VT)

    visibility timeout in seconds

  • qty (Integer) (defaults to: 1)

    number of messages to read

  • max_poll_seconds (Integer) (defaults to: 5)

    maximum time to poll in seconds

  • poll_interval_ms (Integer) (defaults to: 100)

    interval between polls in milliseconds

  • conditional (Hash) (defaults to: {})

    optional JSONB filter for message payload

Returns:



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/pgmq/client/consumer.rb', line 125

def read_with_poll(
  queue_name,
  vt: DEFAULT_VT,
  qty: 1,
  max_poll_seconds: 5,
  poll_interval_ms: 100,
  conditional: {}
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    if conditional.empty?
      conn.exec_params(
        "SELECT * FROM pgmq.read_with_poll($1::text, $2::integer, $3::integer, $4::integer, $5::integer)",
        [queue_name, vt, qty, max_poll_seconds, poll_interval_ms]
      )
    else
      sql = "SELECT * FROM pgmq.read_with_poll($1::text, $2::integer, $3::integer, " \
            "$4::integer, $5::integer, $6::jsonb)"
      conn.exec_params(
        sql,
        [queue_name, vt, qty, max_poll_seconds, poll_interval_ms, conditional.to_json]
      )
    end
  end

  result.map { |row| Message.new(row) }
end