Module: PGMQ::Client::Topics

Included in:
PGMQ::Client
Defined in:
lib/pgmq/client/topics.rb

Overview

Note:

Requires PGMQ v1.11.0+

Topic routing operations (AMQP-like patterns)

This module provides AMQP-style topic routing for PGMQ, allowing messages to be routed to multiple queues based on pattern matching.

Topic patterns support wildcards:

  • ‘*` matches exactly one word between dots (e.g., `orders.*` matches `orders.new`)

  • ‘#` matches zero or more words (e.g., `orders.#` matches `orders.new.urgent`)

Instance Method Summary collapse

Instance Method Details

#bind_topic(pattern, queue_name) ⇒ void

This method returns an undefined value.

Binds a topic pattern to a queue

Messages sent with routing keys matching this pattern will be delivered to the specified queue.

Examples:

Bind exact routing key

client.bind_topic("orders.new", "new_orders")

Bind with single-word wildcard

client.bind_topic("orders.*", "all_order_events")

Bind with multi-word wildcard

client.bind_topic("orders.#", "order_audit_log")

Parameters:

  • pattern (String)

    topic pattern with optional wildcards (* or #)

  • queue_name (String)

    name of the queue to bind



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pgmq/client/topics.rb', line 33

def bind_topic(pattern, queue_name)
  validate_queue_name!(queue_name)

  with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.bind_topic($1::text, $2::text)",
      [pattern, queue_name]
    )
  end

  nil
end

#list_topic_bindings(queue_name: nil) ⇒ Array<Hash>

Lists all topic bindings

Examples:

List all bindings

bindings = client.list_topic_bindings
# => [{ pattern: "orders.*", queue_name: "orders", bound_at: "..." }, ...]

List bindings for specific queue

bindings = client.list_topic_bindings(queue_name: "orders")

Parameters:

  • queue_name (String, nil) (defaults to: nil)

    optional queue name to filter by

Returns:

  • (Array<Hash>)

    array of binding hashes with pattern, queue_name, bound_at



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/pgmq/client/topics.rb', line 173

def list_topic_bindings(queue_name: nil)
  result = with_connection do |conn|
    if queue_name
      validate_queue_name!(queue_name)
      conn.exec_params(
        "SELECT pattern, queue_name, bound_at FROM pgmq.list_topic_bindings($1::text)",
        [queue_name]
      )
    else
      conn.exec("SELECT pattern, queue_name, bound_at FROM pgmq.list_topic_bindings()")
    end
  end

  result.map do |row|
    {
      pattern: row["pattern"],
      queue_name: row["queue_name"],
      bound_at: row["bound_at"]
    }
  end
end

#produce_batch_topic(routing_key, messages, headers: nil, delay: 0) ⇒ Array<Hash>

Sends multiple messages via topic routing

All messages will be delivered to all queues whose bound patterns match the routing key.

Examples:

Batch topic send

results = client.produce_batch_topic("orders.new", [
  '{"order_id":1}',
  '{"order_id":2}'
])
# => [{ queue_name: "new_orders", msg_id: "1" }, ...]

Parameters:

  • routing_key (String)

    dot-separated routing key

  • messages (Array<String>)

    array of message payloads as JSON strings

  • headers (Array<String>, nil) (defaults to: nil)

    optional array of headers as JSON strings

  • delay (Integer) (defaults to: 0)

    delay in seconds before messages become visible

Returns:

  • (Array<Hash>)

    array of hashes with :queue_name and :msg_id



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/pgmq/client/topics.rb', line 126

def produce_batch_topic(routing_key, messages, headers: nil, delay: 0)
  return [] if messages.empty?

  if headers && headers.length != messages.length
    raise ArgumentError,
      "headers array length (#{headers.length}) must match messages array length (#{messages.length})"
  end

  result = with_connection do |conn|
    encoder = PG::TextEncoder::Array.new
    encoded_messages = encoder.encode(messages)

    if headers
      encoded_headers = encoder.encode(headers)
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch_topic($1::text, $2::jsonb[], $3::jsonb[], $4::integer)",
        [routing_key, encoded_messages, encoded_headers, delay]
      )
    elsif delay > 0
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch_topic($1::text, $2::jsonb[], $3::integer)",
        [routing_key, encoded_messages, delay]
      )
    else
      conn.exec_params(
        "SELECT * FROM pgmq.send_batch_topic($1::text, $2::jsonb[])",
        [routing_key, encoded_messages]
      )
    end
  end

  result.map do |row|
    { queue_name: row["queue_name"], msg_id: row["msg_id"] }
  end
end

#produce_topic(routing_key, message, headers: nil, delay: 0) ⇒ Integer

Sends a message via topic routing

The message will be delivered to all queues whose bound patterns match the routing key.

Examples:

Basic topic send

count = client.produce_topic("orders.new", '{"order_id":123}')

With headers and delay

count = client.produce_topic("orders.new.priority",
  '{"order_id":123}',
  headers: '{"trace_id":"abc"}',
  delay: 30)

Parameters:

  • routing_key (String)

    dot-separated routing key (e.g., “orders.new.priority”)

  • message (String)

    message as JSON string

  • headers (String, nil) (defaults to: nil)

    optional headers as JSON string

  • delay (Integer) (defaults to: 0)

    delay in seconds before message becomes visible

Returns:

  • (Integer)

    count of queues the message was delivered to



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/pgmq/client/topics.rb', line 86

def produce_topic(routing_key, message, headers: nil, delay: 0)
  result = with_connection do |conn|
    if headers
      conn.exec_params(
        "SELECT pgmq.send_topic($1::text, $2::jsonb, $3::jsonb, $4::integer)",
        [routing_key, message, headers, delay]
      )
    elsif delay > 0
      conn.exec_params(
        "SELECT pgmq.send_topic($1::text, $2::jsonb, $3::integer)",
        [routing_key, message, delay]
      )
    else
      conn.exec_params(
        "SELECT pgmq.send_topic($1::text, $2::jsonb)",
        [routing_key, message]
      )
    end
  end

  result[0]["send_topic"].to_i
end

#test_routing(routing_key) ⇒ Array<Hash>

Tests which queues a routing key would match

Useful for debugging topic routing configurations.

Examples:

Test routing

matches = client.test_routing("orders.new.priority")
# => [{ pattern: "orders.*", queue_name: "new_orders" }, ...]

Parameters:

  • routing_key (String)

    routing key to test

Returns:

  • (Array<Hash>)

    array of matched bindings with pattern and queue_name



205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/pgmq/client/topics.rb', line 205

def test_routing(routing_key)
  result = with_connection do |conn|
    conn.exec_params(
      "SELECT pattern, queue_name FROM pgmq.test_routing($1::text)",
      [routing_key]
    )
  end

  result.map do |row|
    { pattern: row["pattern"], queue_name: row["queue_name"] }
  end
end

#unbind_topic(pattern, queue_name) ⇒ Boolean

Unbinds a topic pattern from a queue

Examples:

client.unbind_topic("orders.new", "new_orders")

Parameters:

  • pattern (String)

    topic pattern to unbind

  • queue_name (String)

    name of the queue to unbind from

Returns:

  • (Boolean)

    true if the binding was removed, false if it didn’t exist



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pgmq/client/topics.rb', line 54

def unbind_topic(pattern, queue_name)
  validate_queue_name!(queue_name)

  result = with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.unbind_topic($1::text, $2::text)",
      [pattern, queue_name]
    )
  end

  result[0]["unbind_topic"] == "t"
end

#validate_routing_key(routing_key) ⇒ Boolean

Validates a routing key

Routing keys are dot-separated words (no wildcards allowed). Returns false for invalid routing keys (PGMQ raises an error for invalid keys).

Examples:

client.validate_routing_key("orders.new.priority")  # => true
client.validate_routing_key("orders.*")             # => false (wildcards not allowed)

Parameters:

  • routing_key (String)

    routing key to validate

Returns:

  • (Boolean)

    true if valid, false if invalid



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/pgmq/client/topics.rb', line 229

def validate_routing_key(routing_key)
  result = with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.validate_routing_key($1::text)",
      [routing_key]
    )
  end

  result[0]["validate_routing_key"] == "t"
rescue PGMQ::Errors::ConnectionError => e
  # PGMQ raises an error for invalid routing keys
  return false if e.message.include?("invalid characters")

  raise
end

#validate_topic_pattern(pattern) ⇒ Boolean

Validates a topic pattern

Topic patterns can include wildcards: * (single word) or # (zero or more words).

Examples:

client.validate_topic_pattern("orders.*")    # => true
client.validate_topic_pattern("orders.#")    # => true
client.validate_topic_pattern("orders.new")  # => true

Parameters:

  • pattern (String)

    topic pattern to validate

Returns:

  • (Boolean)

    true if valid



256
257
258
259
260
261
262
263
264
265
# File 'lib/pgmq/client/topics.rb', line 256

def validate_topic_pattern(pattern)
  result = with_connection do |conn|
    conn.exec_params(
      "SELECT pgmq.validate_topic_pattern($1::text)",
      [pattern]
    )
  end

  result[0]["validate_topic_pattern"] == "t"
end