Class: Pgbus::Streams::Stream
- Inherits:
-
Object
- Object
- Pgbus::Streams::Stream
- Defined in:
- lib/pgbus/streams.rb
Overview
A handle on a single logical stream. The name can be any string, an
object responding to to_gid_param, or an array of streamables (which
are joined with colons — turbo-rails-compatible).
Stream is not a singleton — callers create instances ad hoc, but the underlying queue is created lazily and only once per process per name.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
-
.name_from(streamables) ⇒ Object
Mirrors
Turbo::Streams::StreamName#stream_name_from. -
.validate_name_length!(name, streamables) ⇒ Object
Enforces the pgbus queue-name budget at the Stream-construction boundary so a forgotten call site fails with an actionable error (pointing at the offending streamables and suggesting
Pgbus.stream_key) instead of an opaque QueueNameValidator failure three frames deep in Client#ensure_stream_queue.
Instance Method Summary collapse
-
#broadcast(payload, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil, target: nil) ⇒ Object
Broadcasts a Turbo Stream HTML payload through the pgbus streamer.
-
#broadcast_render(target:, action: :replace, renderable: nil, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil) ⇒ Object
Renders a renderable (a Phlex component, a ViewComponent, or a pre-rendered HTML string) into a complete
<turbo-stream>action tag and broadcasts it atomically — the render and the broadcast in one call. - #current_msg_id ⇒ Object
- #durable? ⇒ Boolean
- #ensure! ⇒ Object
-
#initialize(streamables, client: Pgbus.client, durable: true) ⇒ Stream
constructor
A new instance of Stream.
-
#presence ⇒ Object
Returns a Pgbus::Streams::Presence handle for this stream.
- #read_after(after_id:, limit: 500) ⇒ Object
Constructor Details
#initialize(streamables, client: Pgbus.client, durable: true) ⇒ Stream
Returns a new instance of Stream.
65 66 67 68 69 70 71 72 |
# File 'lib/pgbus/streams.rb', line 65 def initialize(streamables, client: Pgbus.client, durable: true) @name = self.class.name_from(streamables) self.class.validate_name_length!(@name, streamables) @client = client @durable = durable @ensured = false @ensure_mutex = Mutex.new end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
63 64 65 |
# File 'lib/pgbus/streams.rb', line 63 def name @name end |
Class Method Details
.name_from(streamables) ⇒ Object
Mirrors Turbo::Streams::StreamName#stream_name_from. Strings pass
through; objects with to_gid_param or to_param are coerced; arrays
are joined with :. The result is suitable both as a logical stream
identifier and as the input to QueueNameValidator (after sanitisation).
223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/pgbus/streams.rb', line 223 def self.name_from(streamables) if streamables.is_a?(Array) streamables.map { |s| name_from(s) }.join(":") elsif streamables.respond_to?(:to_gid_param) streamables.to_gid_param elsif streamables.respond_to?(:to_param) && !streamables.is_a?(Symbol) streamables.to_param else streamables.to_s end end |
.validate_name_length!(name, streamables) ⇒ Object
Enforces the pgbus queue-name budget at the Stream-construction
boundary so a forgotten call site fails with an actionable error
(pointing at the offending streamables and suggesting
Pgbus.stream_key) instead of an opaque QueueNameValidator
failure three frames deep in Client#ensure_stream_queue.
The budget is computed from config.queue_prefix at call time
so apps that override the prefix get the correct limit. Does not
mutate the name — silent truncation is a footgun for
multi-tenant apps where collisions would mix broadcasts across
records. Callers who need a short, safe identifier should use
Pgbus.stream_key(...) or include Pgbus::Streams::Streamable
on their ActiveRecord models.
248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/pgbus/streams.rb', line 248 def self.validate_name_length!(name, streamables) budget = Key.queue_name_budget return if name.length <= budget raise StreamNameTooLong, "Stream name #{name.inspect} is #{name.length} chars, " \ "exceeds pgbus budget of #{budget} " \ "(queue_prefix=#{Pgbus.configuration.queue_prefix.inspect}, " \ "pgbus_max_queue_name_length=#{QueueNameValidator::MAX_QUEUE_NAME_LENGTH}). " \ "Streamables: #{streamables.inspect}. " \ "Use Pgbus.stream_key(*streamables) to produce a safe short name, " \ "or include Pgbus::Streams::Streamable on the model." end |
Instance Method Details
#broadcast(payload, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil, target: nil) ⇒ Object
Broadcasts a Turbo Stream HTML payload through the pgbus streamer.
PGMQ's message column is JSONB, so raw HTML strings can't be passed
directly. We wrap as {"html": "..."} on the way in and unwrap in
Pgbus::Web::Streamer::StreamEventDispatcher before delivering to the SSE client.
Callers pass a plain HTML string; the wrapping is an implementation
detail.
Transactional semantics: if this call is made inside an open ActiveRecord transaction, the PGMQ insert is deferred to an after_commit callback. If the transaction rolls back, the broadcast silently drops — clients never see the change that the database never persisted. This is the feature no other Rails real-time stack (including turbo-rails over ActionCable) can offer: the broadcast and the data mutation are atomic with respect to each other. Returns the assigned msg_id when sent synchronously, nil when deferred (the id isn't known until the after_commit callback runs).
Audience filtering: pass visible_to: with a filter label (a
Symbol previously registered via Pgbus::Streams.filters.register)
to restrict delivery to connections whose authorize-hook context
satisfies the predicate. The label travels with the broadcast
through PGMQ; the predicate itself lives in-process on the
subscriber side and is evaluated per-connection by the Dispatcher.
Per-broadcast durable: overrides the stream-level default for a
single broadcast. nil (the default) defers to the stream's own
durable? setting; true/false flip the mode for this call only.
Actor-echo suppression: pass exclude: with the broadcaster's own
SSE connection id (surfaced to the page as
<meta name="pgbus-connection-id"> / the element's connection-id
attribute, sent back on the action request as the
X-Pgbus-Connection header). The dispatcher skips delivery to that
one connection, so the actor doesn't receive the echo of its own
broadcast — it already applied the change via the action's HTTP
response. Everyone else gets the broadcast. A nil/blank exclude
is a no-op (the common path).
Typed SSE event name: pass event: to set the SSE event: field
on the delivered frame (e.g. event: "presence", event: "reactive") while keeping the payload a Turbo Stream. Clients that
care can route on the typed event without sniffing the HTML;
default consumers still receive the standard turbo-stream/message
path. The default (nil or "turbo-stream") is not carried on the
wire — it's the implicit default the dispatcher applies.
High-frequency coalescing: pass coalesce: (a window in milliseconds,
or true for the default window) together with target: to batch
rapid broadcasts to the same (stream, target) and publish only the
latest within the window. Superseded frames never reach the bus.
Last-write-wins, so this is only safe for idempotent replace/update
of a stable target (the high-frequency case: cursors, typing,
progress). Returns nil — the actual broadcast is deferred to the
coalescer's flush. See issue #171 and Pgbus::Streams::Coalescer.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/pgbus/streams.rb', line 129 def broadcast(payload, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil, target: nil) if coalesce return coalesce_broadcast( payload, coalesce: coalesce, target: target, visible_to: visible_to, durable: durable, exclude: exclude, event: event ) end wrapped = { "html" => payload.to_s } wrapped["visible_to"] = visible_to.to_s if visible_to wrapped["exclude"] = exclude.to_s if exclude && !exclude.to_s.empty? wrapped["event"] = event.to_s if event && event.to_s != DEFAULT_SSE_EVENT use_durable = durable.nil? ? @durable : durable return broadcast_ephemeral(wrapped) unless use_durable ensure_queue! transaction = current_open_transaction instrument_payload = { stream: @name, visible_to: visible_to, deferred: !transaction.nil?, bytes: wrapped["html"].bytesize } Instrumentation.instrument("pgbus.stream.broadcast", instrument_payload) do if transaction transaction.after_commit { @client.(@name, wrapped) } nil else @client.(@name, wrapped) end end end |
#broadcast_render(target:, action: :replace, renderable: nil, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil) ⇒ Object
Renders a renderable (a Phlex component, a ViewComponent, or a
pre-rendered HTML string) into a complete <turbo-stream> action
tag and broadcasts it atomically — the render and the broadcast in
one call. This removes the #1 footgun in server-driven UI: building
the off-request render context and the turbo-stream wrapper by hand
at every call site.
Pgbus.stream("chat", room).broadcast_render(
renderable: Chat::Message.new(chat_message: msg),
action: :append, target: "chat-messages-#{room}",
exclude: connection_id # composes with #165
)
action defaults to :replace. target is required. exclude:,
visible_to:, and durable: are forwarded to #broadcast unchanged,
so actor-echo suppression and audience filtering compose. Returns
whatever #broadcast returns (msg_id, or nil when deferred to
after_commit inside a transaction).
See Pgbus::Streams::Renderer for the renderable-resolution order.
Components that need URL helpers or a full view context should be
rendered by the app (which has the request context) and the
resulting string passed as renderable:.
191 192 193 194 195 196 |
# File 'lib/pgbus/streams.rb', line 191 def broadcast_render(target:, action: :replace, renderable: nil, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil) html = Renderer.turbo_stream_tag(action: action, target: target, renderable: renderable) broadcast(html, visible_to: visible_to, durable: durable, exclude: exclude, event: event, coalesce: coalesce, target: target) end |
#current_msg_id ⇒ Object
198 199 200 |
# File 'lib/pgbus/streams.rb', line 198 def current_msg_id @client.stream_current_msg_id(@name) end |
#durable? ⇒ Boolean
74 75 76 |
# File 'lib/pgbus/streams.rb', line 74 def durable? @durable end |
#ensure! ⇒ Object
206 207 208 209 |
# File 'lib/pgbus/streams.rb', line 206 def ensure! ensure_queue! self end |
#presence ⇒ Object
Returns a Pgbus::Streams::Presence handle for this stream. The Presence object exposes join/leave/touch/members/sweep! for tracking who is currently subscribed. See lib/pgbus/streams/presence.rb for the API.
215 216 217 |
# File 'lib/pgbus/streams.rb', line 215 def presence @presence ||= Presence.new(self) end |
#read_after(after_id:, limit: 500) ⇒ Object
202 203 204 |
# File 'lib/pgbus/streams.rb', line 202 def read_after(after_id:, limit: 500) @client.read_after(@name, after_id: after_id, limit: limit) end |