Module: PGMQ::Client::Producer

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/producer.rb

Overview

Message producing operations

This module handles producing messages to queues, both individual messages and batches. Users must serialize messages to JSON strings themselves.

Instance Method Summary collapse

Instance Method Details

#produce(queue_name, message, headers: nil, delay: 0) ⇒ String

Note:

Users must serialize to JSON themselves. Higher-level frameworks should handle serialization.

Produces a message to a queue

Examples:

Basic produce

msg_id = client.produce("orders", '{"order_id":123,"total":99.99}')

With delay

msg_id = client.produce("orders", '{"data":"value"}', delay: 60)

With headers for routing/tracing

msg_id = client.produce("orders", '{"order_id":123}',
  headers: '{"trace_id":"abc123","priority":"high"}')

With headers and delay

msg_id = client.produce("orders", '{"order_id":123}',
  headers: '{"correlation_id":"req-456"}',
  delay: 30)

Parameters:

  • queue_name (String)

    name of the queue

  • message (String)

    message as JSON string (for PostgreSQL JSONB)

  • headers (String, nil) (defaults to: nil)

    optional headers as JSON string (for metadata, routing, tracing)

  • delay (Integer) (defaults to: 0)

    delay in seconds before message becomes visible

Returns:

  • (String)

    message ID as string



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pgmq/client/producer.rb', line 35

def produce(
  queue_name,
  message,
  headers: nil,
  delay: 0
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    if headers
      conn.exec_params(
        "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::jsonb, $4::integer)",
        [queue_name, message, headers, delay]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::integer)",
        [queue_name, message, delay]
      )
    end
  end

  result[0]["send"]
end

#produce_batch(queue_name, messages, headers: nil, delay: 0) ⇒ Array<String>

Produces multiple messages to a queue in a batch

Examples:

Basic batch produce

ids = client.produce_batch("orders", [
  '{"order_id":1}',
  '{"order_id":2}',
  '{"order_id":3}'
])

With headers (one per message)

ids = client.produce_batch("orders",
  ['{"order_id":1}', '{"order_id":2}'],
  headers: ['{"priority":"high"}', '{"priority":"low"}'])

With headers and delay

ids = client.produce_batch("orders",
  ['{"order_id":1}', '{"order_id":2}'],
  headers: ['{"trace_id":"a"}', '{"trace_id":"b"}'],
  delay: 60)

Parameters:

  • queue_name (String)

    name of the queue

  • messages (Array<String>)

    array of message payloads as JSON strings

  • headers (Array<String>, nil) (defaults to: nil)

    optional array of headers as JSON strings (must match messages length)

  • delay (Integer) (defaults to: 0)

    delay in seconds before messages become visible

Returns:

  • (Array<String>)

    array of message IDs

Raises:

  • (ArgumentError)

    if headers array length doesn’t match messages length



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/pgmq/client/producer.rb', line 86

def produce_batch(
  queue_name,
  messages,
  headers: nil,
  delay: 0
)
  validate_queue_name!(queue_name)
  return [] if messages.empty?

  if headers && headers.length != messages.length
    raise ArgumentError,
      "headers array length (#{headers.length}) must match messages array length (#{messages.length})"
  end

  # Use PostgreSQL array parameter binding for security
  # PG gem will properly encode the array values
  result = with_connection do |conn|
    # Create array encoder for proper PostgreSQL array formatting
    encoder = PG::TextEncoder::Array.new
    encoded_messages = encoder.encode(messages)

    if headers
      encoded_headers = encoder.encode(headers)
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::jsonb[], $4::integer)",
        [queue_name, encoded_messages, encoded_headers, delay]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::integer)",
        [queue_name, encoded_messages, delay]
      )
    end
  end

  result.map { |row| row["send_batch"] }
end