Module: Pgbus::Client::NotifyStream
- Included in:
- Pgbus::Client
- Defined in:
- lib/pgbus/client/notify_stream.rb
Overview
Fire-and-forget PG NOTIFY for ephemeral stream broadcasts. No PGMQ queue is created — the payload travels via the Postgres NOTIFY channel only, matching the channel naming convention that PGMQ’s trigger uses:
pgmq.q_<full_queue_name>.INSERT
Subscribers already LISTEN on this channel via the Streamer’s Listener. When a subscriber is connected, the StreamEventDispatcher receives the NOTIFY and fans out the payload. When no subscriber is connected, the NOTIFY is silently discarded by Postgres — no queue, no storage, no orphan tables.
The payload is JSON-serialized into the NOTIFY’s optional payload parameter (max 8000 bytes in Postgres). Broadcasts exceeding this limit will raise a PG::ProgramLimitExceeded error — callers needing large payloads should use durable mode (which inserts into PGMQ).
Instance Method Summary collapse
Instance Method Details
#notify_stream(stream_name, payload) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/pgbus/client/notify_stream.rb', line 21 def notify_stream(stream_name, payload) full_name = config.queue_name(stream_name) sanitized = QueueNameValidator.sanitize!(full_name) channel = "pgmq.q_#{sanitized}.INSERT" json = payload.is_a?(String) ? payload : JSON.generate(payload) Instrumentation.instrument("pgbus.stream.notify", stream: stream_name, bytes: json.bytesize) do synchronized do @pgmq.__send__(:with_connection) do |conn| conn.exec_params("SELECT pg_notify($1, $2)", [channel, json]) end end end end |