Module: PGMQ::Client::Consumer
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/consumer.rb
Overview
Single-queue message reading operations
This module handles reading messages from a single queue, including basic reads, batch reads, and long-polling for efficient message consumption.
Instance Method Summary collapse
-
#read(queue_name, vt: DEFAULT_VT, conditional: {}) ⇒ PGMQ::Message?
Reads a message from the queue.
-
#read_batch(queue_name, vt: DEFAULT_VT, qty: 1, conditional: {}) ⇒ Array<PGMQ::Message>
Reads multiple messages from the queue.
-
#read_grouped(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Reads messages using SQS-style grouped ordering (throughput-optimised).
-
#read_grouped_head(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Reads one message per FIFO group from the head of each group.
-
#read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin ordering.
-
#read_grouped_rr_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin with long-polling support.
-
#read_grouped_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads messages using SQS-style grouped ordering with long-polling support.
-
#read_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100, conditional: {}) ⇒ Array<PGMQ::Message>
Reads messages with long-polling support.
Instance Method Details
#read(queue_name, vt: DEFAULT_VT, conditional: {}) ⇒ PGMQ::Message?
Reads a message from the queue
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/pgmq/client/consumer.rb', line 26 def read( queue_name, vt: DEFAULT_VT, conditional: {} ) validate_queue_name!(queue_name) result = with_connection do |conn| if conditional.empty? conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer)", [queue_name, vt, 1] ) else conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer, $4::jsonb)", [queue_name, vt, 1, conditional.to_json] ) end end return nil if result.ntuples.zero? Message.new(result[0]) end |
#read_batch(queue_name, vt: DEFAULT_VT, qty: 1, conditional: {}) ⇒ Array<PGMQ::Message>
Reads multiple messages from the queue
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/pgmq/client/consumer.rb', line 74 def read_batch( queue_name, vt: DEFAULT_VT, qty: 1, conditional: {} ) validate_queue_name!(queue_name) result = with_connection do |conn| if conditional.empty? conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer)", [queue_name, vt, qty] ) else conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer, $4::jsonb)", [queue_name, vt, qty, conditional.to_json] ) end end result.map { |row| Message.new(row) } end |
#read_grouped(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Reads messages using SQS-style grouped ordering (throughput-optimised)
Messages are grouped by the first key in their JSON payload. Unlike round-robin, this strategy fills the requested batch from the oldest group first, then moves on to the next group only when the first is exhausted. Maximises throughput for bursty workloads at the cost of fairness across groups.
176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/pgmq/client/consumer.rb', line 176 def read_grouped(queue_name, vt: DEFAULT_VT, qty: 1) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT * FROM pgmq.read_grouped($1::text, $2::integer, $3::integer)", [queue_name, vt, qty] ) end result.map { |row| Message.new(row) } end |
#read_grouped_head(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Requires PGMQ v1.11.1+.
Reads one message per FIFO group from the head of each group
Returns exactly one message - the oldest visible message - from each distinct FIFO group, up to qty groups. Groups are determined by the ‘x-pgmq-group` key in the message headers (set via the `headers:` param on `produce`). Messages without that header key all land in a single implicit default group, so only one of them is returned per call.
Unlike ‘read_grouped` (which groups by the first payload key and drains one group fully before moving to the next), `read_grouped_head` surfaces the leading edge of every group in one call - useful for detecting head-of-line stalls or building per-group progress dashboards.
257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/pgmq/client/consumer.rb', line 257 def read_grouped_head(queue_name, vt: DEFAULT_VT, qty: 1) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT * FROM pgmq.read_grouped_head($1::text, $2::integer, $3::integer)", [queue_name, vt, qty] ) end result.map { |row| Message.new(row) } end |
#read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin ordering
Messages are grouped by the first key in their JSON payload and returned in round-robin order across groups. This ensures fair processing when messages from different entities (users, orders, etc.) are in the queue.
291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/pgmq/client/consumer.rb', line 291 def read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT * FROM pgmq.read_grouped_rr($1::text, $2::integer, $3::integer)", [queue_name, vt, qty] ) end result.map { |row| Message.new(row) } end |
#read_grouped_rr_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin with long-polling support
Combines grouped round-robin ordering with long-polling for efficient and fair message consumption.
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/pgmq/client/consumer.rb', line 322 def read_grouped_rr_with_poll( queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100 ) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT * FROM pgmq.read_grouped_rr_with_poll($1::text, $2::integer, $3::integer, $4::integer, $5::integer)", [queue_name, vt, qty, max_poll_seconds, poll_interval_ms] ) end result.map { |row| Message.new(row) } end |
#read_grouped_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads messages using SQS-style grouped ordering with long-polling support
Combines SQS-style throughput-first grouped ordering with long-polling. Blocks up to max_poll_seconds if the queue is empty, returning as soon as any message arrives.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/pgmq/client/consumer.rb', line 208 def read_grouped_with_poll( queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100 ) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT * FROM pgmq.read_grouped_with_poll($1::text, $2::integer, $3::integer, $4::integer, $5::integer)", [queue_name, vt, qty, max_poll_seconds, poll_interval_ms] ) end result.map { |row| Message.new(row) } end |
#read_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100, conditional: {}) ⇒ Array<PGMQ::Message>
Reads messages with long-polling support
Polls the queue for messages, waiting up to max_poll_seconds if queue is empty
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/pgmq/client/consumer.rb', line 125 def read_with_poll( queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100, conditional: {} ) validate_queue_name!(queue_name) result = with_connection do |conn| if conditional.empty? conn.exec_params( "SELECT * FROM pgmq.read_with_poll($1::text, $2::integer, $3::integer, $4::integer, $5::integer)", [queue_name, vt, qty, max_poll_seconds, poll_interval_ms] ) else sql = "SELECT * FROM pgmq.read_with_poll($1::text, $2::integer, $3::integer, " \ "$4::integer, $5::integer, $6::jsonb)" conn.exec_params( sql, [queue_name, vt, qty, max_poll_seconds, poll_interval_ms, conditional.to_json] ) end end result.map { |row| Message.new(row) } end |