Module: PGMQ::Client::Consumer
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/consumer.rb
Overview
Single-queue message reading operations
This module handles reading messages from a single queue, including basic reads, batch reads, and long-polling for efficient message consumption.
Instance Method Summary collapse
-
#read(queue_name, vt: DEFAULT_VT, conditional: {}) ⇒ PGMQ::Message?
Reads a message from the queue.
-
#read_batch(queue_name, vt: DEFAULT_VT, qty: 1, conditional: {}) ⇒ Array<PGMQ::Message>
Reads multiple messages from the queue.
-
#read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin ordering.
-
#read_grouped_rr_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin with long-polling support.
-
#read_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100, conditional: {}) ⇒ Array<PGMQ::Message>
Reads messages with long-polling support.
Instance Method Details
#read(queue_name, vt: DEFAULT_VT, conditional: {}) ⇒ PGMQ::Message?
Reads a message from the queue
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/pgmq/client/consumer.rb', line 26 def read( queue_name, vt: DEFAULT_VT, conditional: {} ) validate_queue_name!(queue_name) result = with_connection do |conn| if conditional.empty? conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer)", [queue_name, vt, 1] ) else conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer, $4::jsonb)", [queue_name, vt, 1, conditional.to_json] ) end end return nil if result.ntuples.zero? Message.new(result[0]) end |
#read_batch(queue_name, vt: DEFAULT_VT, qty: 1, conditional: {}) ⇒ Array<PGMQ::Message>
Reads multiple messages from the queue
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/pgmq/client/consumer.rb', line 74 def read_batch( queue_name, vt: DEFAULT_VT, qty: 1, conditional: {} ) validate_queue_name!(queue_name) result = with_connection do |conn| if conditional.empty? conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer)", [queue_name, vt, qty] ) else conn.exec_params( "SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer, $4::jsonb)", [queue_name, vt, qty, conditional.to_json] ) end end result.map { |row| Message.new(row) } end |
#read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin ordering
Messages are grouped by the first key in their JSON payload and returned in round-robin order across groups. This ensures fair processing when messages from different entities (users, orders, etc.) are in the queue.
176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/pgmq/client/consumer.rb', line 176 def read_grouped_rr(queue_name, vt: DEFAULT_VT, qty: 1) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT * FROM pgmq.read_grouped_rr($1::text, $2::integer, $3::integer)", [queue_name, vt, qty] ) end result.map { |row| Message.new(row) } end |
#read_grouped_rr_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads messages using grouped round-robin with long-polling support
Combines grouped round-robin ordering with long-polling for efficient and fair message consumption.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/pgmq/client/consumer.rb', line 208 def read_grouped_rr_with_poll( queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100 ) validate_queue_name!(queue_name) result = with_connection do |conn| conn.exec_params( "SELECT * FROM pgmq.read_grouped_rr_with_poll($1::text, $2::integer, $3::integer, $4::integer, $5::integer)", [queue_name, vt, qty, max_poll_seconds, poll_interval_ms] ) end result.map { |row| Message.new(row) } end |
#read_with_poll(queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100, conditional: {}) ⇒ Array<PGMQ::Message>
Reads messages with long-polling support
Polls the queue for messages, waiting up to max_poll_seconds if queue is empty
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/pgmq/client/consumer.rb', line 125 def read_with_poll( queue_name, vt: DEFAULT_VT, qty: 1, max_poll_seconds: 5, poll_interval_ms: 100, conditional: {} ) validate_queue_name!(queue_name) result = with_connection do |conn| if conditional.empty? conn.exec_params( "SELECT * FROM pgmq.read_with_poll($1::text, $2::integer, $3::integer, $4::integer, $5::integer)", [queue_name, vt, qty, max_poll_seconds, poll_interval_ms] ) else sql = "SELECT * FROM pgmq.read_with_poll($1::text, $2::integer, $3::integer, " \ "$4::integer, $5::integer, $6::jsonb)" conn.exec_params( sql, [queue_name, vt, qty, max_poll_seconds, poll_interval_ms, conditional.to_json] ) end end result.map { |row| Message.new(row) } end |