Module: PGMQ::Client::Producer

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/producer.rb

Overview

Message producing operations

This module handles producing messages to queues, both individual messages and batches. Users must serialize messages to JSON strings themselves.

Instance Method Summary collapse

Instance Method Details

#produce(queue_name, message, headers: nil, delay: 0) ⇒ String

Note:

Users must serialize to JSON themselves. Higher-level frameworks should handle serialization.

Produces a message to a queue

Examples:

Basic produce

msg_id = client.produce("orders", '{"order_id":123,"total":99.99}')

With integer delay (seconds)

msg_id = client.produce("orders", '{"data":"value"}', delay: 60)

With absolute timestamp delay

msg_id = client.produce("orders", '{"data":"value"}', delay: Time.now + 3600)

With headers for routing/tracing

msg_id = client.produce("orders", '{"order_id":123}',
  headers: '{"trace_id":"abc123","priority":"high"}')

With headers and absolute timestamp delay

msg_id = client.produce("orders", '{"order_id":123}',
  headers: '{"correlation_id":"req-456"}',
  delay: Time.now + 300)

Parameters:

  • queue_name (String)

    name of the queue

  • message (String)

    message as JSON string (for PostgreSQL JSONB)

  • headers (String, nil) (defaults to: nil)

    optional headers as JSON string (for metadata, routing, tracing)

  • delay (Numeric, Time) (defaults to: 0)

    delay in seconds before message becomes visible (integer or float), or an absolute Time at which the message becomes visible, including ActiveSupport::TimeWithZone (PGMQ v1.10.0+)

Returns:

  • (String)

    message ID as string



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/pgmq/client/producer.rb', line 40

def produce(
  queue_name,
  message,
  headers: nil,
  delay: 0
)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    if headers && !delay.is_a?(Numeric)
      conn.exec_params(
        "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::jsonb, $4::timestamptz)",
        [queue_name, message, headers, delay.to_time.utc.iso8601(6)]
      )
    elsif headers
      conn.exec_params(
        "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::jsonb, $4::integer)",
        [queue_name, message, headers, delay]
      )
    elsif !delay.is_a?(Numeric)
      conn.exec_params(
        "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::timestamptz)",
        [queue_name, message, delay.to_time.utc.iso8601(6)]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::integer)",
        [queue_name, message, delay]
      )
    end
  end

  result[0]["send"]
end

#produce_batch(queue_name, messages, headers: nil, delay: 0) ⇒ Array<String>

Produces multiple messages to a queue in a batch

Examples:

Basic batch produce

ids = client.produce_batch("orders", [
  '{"order_id":1}',
  '{"order_id":2}',
  '{"order_id":3}'
])

With headers (one per message)

ids = client.produce_batch("orders",
  ['{"order_id":1}', '{"order_id":2}'],
  headers: ['{"priority":"high"}', '{"priority":"low"}'])

With headers and delay

ids = client.produce_batch("orders",
  ['{"order_id":1}', '{"order_id":2}'],
  headers: ['{"trace_id":"a"}', '{"trace_id":"b"}'],
  delay: 60)

Parameters:

  • queue_name (String)

    name of the queue

  • messages (Array<String>)

    array of message payloads as JSON strings

  • headers (Array<String>, nil) (defaults to: nil)

    optional array of headers as JSON strings (must match messages length)

  • delay (Numeric, Time) (defaults to: 0)

    delay in seconds before messages become visible (integer or float), or an absolute Time at which the messages become visible, including ActiveSupport::TimeWithZone (PGMQ v1.10.0+)

Returns:

  • (Array<String>)

    array of message IDs

Raises:

  • (ArgumentError)

    if headers array length doesn’t match messages length



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/pgmq/client/producer.rb', line 103

def produce_batch(
  queue_name,
  messages,
  headers: nil,
  delay: 0
)
  validate_queue_name!(queue_name)
  return [] if messages.empty?

  if headers && headers.length != messages.length
    raise ArgumentError,
      "headers array length (#{headers.length}) must match messages array length (#{messages.length})"
  end

  result = with_connection do |conn|
    encoder = PG::TextEncoder::Array.new
    encoded_messages = encoder.encode(messages)

    if headers && !delay.is_a?(Numeric)
      encoded_headers = encoder.encode(headers)
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::jsonb[], $4::timestamptz)",
        [queue_name, encoded_messages, encoded_headers, delay.to_time.utc.iso8601(6)]
      )
    elsif headers
      encoded_headers = encoder.encode(headers)
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::jsonb[], $4::integer)",
        [queue_name, encoded_messages, encoded_headers, delay]
      )
    elsif !delay.is_a?(Numeric)
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::timestamptz)",
        [queue_name, encoded_messages, delay.to_time.utc.iso8601(6)]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::integer)",
        [queue_name, encoded_messages, delay]
      )
    end
  end

  result.map { |row| row["send_batch"] }
end