Module: PGMQ::Client::MultiQueue
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/multi_queue.rb
Overview
Multi-queue operations
This module handles efficient operations across multiple queues using single database queries with UNION ALL for optimal performance.
Instance Method Summary collapse
-
#pop_multi(queue_names) ⇒ PGMQ::Message?
Pops a message from multiple queues (atomic read + delete).
-
#read_multi(queue_names, vt: DEFAULT_VT, qty: 1, limit: nil) ⇒ Array<PGMQ::Message>
Reads from multiple queues in a single query.
-
#read_multi_with_poll(queue_names, vt: DEFAULT_VT, qty: 1, limit: nil, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads from multiple queues with long-polling (waits for messages).
Instance Method Details
#pop_multi(queue_names) ⇒ PGMQ::Message?
Pops a message from multiple queues (atomic read + delete)
Efficiently gets and immediately deletes the first available message from any of the specified queues. Uses a single query with UNION ALL.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/pgmq/client/multi_queue.rb', line 167 def pop_multi(queue_names) raise ArgumentError, "queue_names must be an array" unless queue_names.is_a?(Array) raise ArgumentError, "queue_names cannot be empty" if queue_names.empty? raise ArgumentError, "queue_names cannot exceed 50 queues" if queue_names.size > 50 # Validate all queue names queue_names.each { |qn| validate_queue_name!(qn) } # Build UNION ALL query for all queues union_queries = queue_names.map do |queue_name| escaped_name = queue_name.gsub("'", "''") "SELECT '#{escaped_name}'::text as queue_name, * FROM pgmq.pop('#{escaped_name}'::text)" end sql = "#{union_queries.join("\nUNION ALL\n")}\nLIMIT 1" result = with_connection do |conn| conn.exec(sql) end return nil if result.ntuples.zero? Message.new(result[0]) end |
#read_multi(queue_names, vt: DEFAULT_VT, qty: 1, limit: nil) ⇒ Array<PGMQ::Message>
Reads from multiple queues in a single query
This is the most efficient way to monitor multiple queues with a single database connection. Uses UNION ALL to read from all queues in one query.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pgmq/client/multi_queue.rb', line 43 def read_multi( queue_names, vt: DEFAULT_VT, qty: 1, limit: nil ) raise ArgumentError, "queue_names must be an array" unless queue_names.is_a?(Array) raise ArgumentError, "queue_names cannot be empty" if queue_names.empty? raise ArgumentError, "queue_names cannot exceed 50 queues" if queue_names.size > 50 # Validate all queue names (prevents SQL injection) queue_names.each { |qn| validate_queue_name!(qn) } # Build UNION ALL query for all queues # Note: Queue names are validated, so this is safe from SQL injection union_queries = queue_names.map do |queue_name| # Escape single quotes in queue name (though validation should prevent this) escaped_name = queue_name.gsub("'", "''") "SELECT '#{escaped_name}'::text as queue_name, * " \ "FROM pgmq.read('#{escaped_name}'::text, #{vt.to_i}, #{qty.to_i})" end sql = union_queries.join("\nUNION ALL\n") sql += "\nLIMIT #{limit.to_i}" if limit result = with_connection do |conn| conn.exec(sql) end result.map do |row| Message.new(row) end end |
#read_multi_with_poll(queue_names, vt: DEFAULT_VT, qty: 1, limit: nil, max_poll_seconds: 5, poll_interval_ms: 100) ⇒ Array<PGMQ::Message>
Reads from multiple queues with long-polling (waits for messages)
Efficiently polls multiple queues waiting for the first available message. This uses a single connection with periodic polling until a message arrives or the timeout is reached.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/pgmq/client/multi_queue.rb', line 113 def read_multi_with_poll( queue_names, vt: DEFAULT_VT, qty: 1, limit: nil, max_poll_seconds: 5, poll_interval_ms: 100 ) raise ArgumentError, "queue_names must be an array" unless queue_names.is_a?(Array) raise ArgumentError, "queue_names cannot be empty" if queue_names.empty? raise ArgumentError, "queue_names cannot exceed 50 queues" if queue_names.size > 50 start_time = Time.now poll_interval_seconds = poll_interval_ms / 1000.0 loop do # Try to read from any queue = read_multi(queue_names, vt: vt, qty: qty, limit: limit) return if .any? # Check timeout elapsed = Time.now - start_time break if elapsed >= max_poll_seconds # Sleep for poll interval (or remaining time, whichever is less) remaining = max_poll_seconds - elapsed sleep [poll_interval_seconds, remaining].min end [] # Return empty array on timeout end |