Module: PGMQ::Client::Producer
- Included in:
- PGMQ::Client
- Defined in:
- lib/pgmq/client/producer.rb
Overview
Message producing operations
This module handles producing messages to queues, both individual messages and batches. Users must serialize messages to JSON strings themselves.
Instance Method Summary collapse
-
#produce(queue_name, message, headers: nil, delay: 0) ⇒ String
Produces a message to a queue.
-
#produce_batch(queue_name, messages, headers: nil, delay: 0) ⇒ Array<String>
Produces multiple messages to a queue in a batch.
Instance Method Details
#produce(queue_name, message, headers: nil, delay: 0) ⇒ String
Note:
Users must serialize to JSON themselves. Higher-level frameworks should handle serialization.
Produces a message to a queue
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pgmq/client/producer.rb', line 35 def produce( queue_name, , headers: nil, delay: 0 ) validate_queue_name!(queue_name) result = with_connection do |conn| if headers conn.exec_params( "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::jsonb, $4::integer)", [queue_name, , headers, delay] ) else conn.exec_params( "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::integer)", [queue_name, , delay] ) end end result[0]["send"] end |
#produce_batch(queue_name, messages, headers: nil, delay: 0) ⇒ Array<String>
Produces multiple messages to a queue in a batch
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/pgmq/client/producer.rb', line 86 def produce_batch( queue_name, , headers: nil, delay: 0 ) validate_queue_name!(queue_name) return [] if .empty? if headers && headers.length != .length raise ArgumentError, "headers array length (#{headers.length}) must match messages array length (#{.length})" end # Use PostgreSQL array parameter binding for security # PG gem will properly encode the array values result = with_connection do |conn| # Create array encoder for proper PostgreSQL array formatting encoder = PG::TextEncoder::Array.new = encoder.encode() if headers encoded_headers = encoder.encode(headers) conn.exec_params( "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::jsonb[], $4::integer)", [queue_name, , encoded_headers, delay] ) else conn.exec_params( "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::integer)", [queue_name, , delay] ) end end result.map { |row| row["send_batch"] } end |