Class: Pgbus::Streams::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/streams.rb

Overview

A handle on a single logical stream. The name can be any string, an object responding to ‘to_gid_param`, or an array of streamables (which are joined with colons — turbo-rails-compatible).

Stream is not a singleton — callers create instances ad hoc, but the underlying queue is created lazily and only once per process per name.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(streamables, client: Pgbus.client, durable: true) ⇒ Stream

Returns a new instance of Stream.



65
66
67
68
69
70
71
72
# File 'lib/pgbus/streams.rb', line 65

def initialize(streamables, client: Pgbus.client, durable: true)
  @name = self.class.name_from(streamables)
  self.class.validate_name_length!(@name, streamables)
  @client = client
  @durable = durable
  @ensured = false
  @ensure_mutex = Mutex.new
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



63
64
65
# File 'lib/pgbus/streams.rb', line 63

def name
  @name
end

Class Method Details

.name_from(streamables) ⇒ Object

Mirrors ‘Turbo::Streams::StreamName#stream_name_from`. Strings pass through; objects with `to_gid_param` or `to_param` are coerced; arrays are joined with `:`. The result is suitable both as a logical stream identifier and as the input to QueueNameValidator (after sanitisation).



223
224
225
226
227
228
229
230
231
232
233
# File 'lib/pgbus/streams.rb', line 223

def self.name_from(streamables)
  if streamables.is_a?(Array)
    streamables.map { |s| name_from(s) }.join(":")
  elsif streamables.respond_to?(:to_gid_param)
    streamables.to_gid_param
  elsif streamables.respond_to?(:to_param) && !streamables.is_a?(Symbol)
    streamables.to_param
  else
    streamables.to_s
  end
end

.validate_name_length!(name, streamables) ⇒ Object

Enforces the pgbus queue-name budget at the Stream-construction boundary so a forgotten call site fails with an actionable error (pointing at the offending streamables and suggesting ‘Pgbus.stream_key`) instead of an opaque QueueNameValidator failure three frames deep in Client#ensure_stream_queue.

The budget is computed from ‘config.queue_prefix` at call time so apps that override the prefix get the correct limit. Does not mutate the name — silent truncation is a footgun for multi-tenant apps where collisions would mix broadcasts across records. Callers who need a short, safe identifier should use `Pgbus.stream_key(…)` or include `Pgbus::Streams::Streamable` on their ActiveRecord models.

Raises:



248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/pgbus/streams.rb', line 248

def self.validate_name_length!(name, streamables)
  budget = Key.queue_name_budget
  return if name.length <= budget

  raise StreamNameTooLong,
        "Stream name #{name.inspect} is #{name.length} chars, " \
        "exceeds pgbus budget of #{budget} " \
        "(queue_prefix=#{Pgbus.configuration.queue_prefix.inspect}, " \
        "pgbus_max_queue_name_length=#{QueueNameValidator::MAX_QUEUE_NAME_LENGTH}). " \
        "Streamables: #{streamables.inspect}. " \
        "Use Pgbus.stream_key(*streamables) to produce a safe short name, " \
        "or include Pgbus::Streams::Streamable on the model."
end

Instance Method Details

#broadcast(payload, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil, target: nil) ⇒ Object

Broadcasts a Turbo Stream HTML payload through the pgbus streamer. PGMQ’s ‘message` column is JSONB, so raw HTML strings can’t be passed directly. We wrap as ‘“…”` on the way in and unwrap in Pgbus::Web::Streamer::StreamEventDispatcher before delivering to the SSE client. Callers pass a plain HTML string; the wrapping is an implementation detail.

Transactional semantics: if this call is made inside an open ActiveRecord transaction, the PGMQ insert is deferred to an after_commit callback. If the transaction rolls back, the broadcast silently drops — clients never see the change that the database never persisted. This is the feature no other Rails real-time stack (including turbo-rails over ActionCable) can offer: the broadcast and the data mutation are atomic with respect to each other. Returns the assigned msg_id when sent synchronously, nil when deferred (the id isn’t known until the after_commit callback runs).

Audience filtering: pass ‘visible_to:` with a filter label (a Symbol previously registered via Pgbus::Streams.filters.register) to restrict delivery to connections whose authorize-hook context satisfies the predicate. The label travels with the broadcast through PGMQ; the predicate itself lives in-process on the subscriber side and is evaluated per-connection by the Dispatcher. Per-broadcast `durable:` overrides the stream-level default for a single broadcast. `nil` (the default) defers to the stream’s own ‘durable?` setting; `true`/`false` flip the mode for this call only.

Actor-echo suppression: pass ‘exclude:` with the broadcaster’s own SSE connection id (surfaced to the page as ‘<meta name=“pgbus-connection-id”>` / the element’s ‘connection-id` attribute, sent back on the action request as the `X-Pgbus-Connection` header). The dispatcher skips delivery to that one connection, so the actor doesn’t receive the echo of its own broadcast — it already applied the change via the action’s HTTP response. Everyone else gets the broadcast. A nil/blank ‘exclude` is a no-op (the common path). Typed SSE event name: pass `event:` to set the SSE `event:` field on the delivered frame (e.g. `event: “presence”`, `event: “reactive”`) while keeping the payload a Turbo Stream. Clients that care can route on the typed event without sniffing the HTML; default consumers still receive the standard turbo-stream/`message` path. The default (`nil` or `“turbo-stream”`) is not carried on the wire — it’s the implicit default the dispatcher applies. High-frequency coalescing: pass ‘coalesce:` (a window in milliseconds, or `true` for the default window) together with `target:` to batch rapid broadcasts to the same (stream, target) and publish only the latest within the window. Superseded frames never reach the bus. Last-write-wins, so this is only safe for idempotent replace/update of a stable target (the high-frequency case: cursors, typing, progress). Returns nil — the actual broadcast is deferred to the coalescer’s flush. See issue #171 and Pgbus::Streams::Coalescer.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/pgbus/streams.rb', line 129

def broadcast(payload, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil, target: nil)
  if coalesce
    return coalesce_broadcast(
      payload,
      coalesce: coalesce,
      target: target,
      visible_to: visible_to,
      durable: durable,
      exclude: exclude,
      event: event
    )
  end

  wrapped = { "html" => payload.to_s }
  wrapped["visible_to"] = visible_to.to_s if visible_to
  wrapped["exclude"] = exclude.to_s if exclude && !exclude.to_s.empty?
  wrapped["event"] = event.to_s if event && event.to_s != DEFAULT_SSE_EVENT

  use_durable = durable.nil? ? @durable : durable
  return broadcast_ephemeral(wrapped) unless use_durable

  ensure_queue!
  transaction = current_open_transaction
  instrument_payload = {
    stream: @name,
    visible_to: visible_to,
    deferred: !transaction.nil?,
    bytes: wrapped["html"].bytesize
  }
  Instrumentation.instrument("pgbus.stream.broadcast", instrument_payload) do
    if transaction
      transaction.after_commit { @client.send_message(@name, wrapped) }
      nil
    else
      @client.send_message(@name, wrapped)
    end
  end
end

#broadcast_render(target:, action: :replace, renderable: nil, visible_to: nil, durable: nil, exclude: nil, event: nil, coalesce: nil) ⇒ Object

Renders a renderable (a Phlex component, a ViewComponent, or a pre-rendered HTML string) into a complete ‘<turbo-stream>` action tag and broadcasts it atomically — the render and the broadcast in one call. This removes the #1 footgun in server-driven UI: building the off-request render context and the turbo-stream wrapper by hand at every call site.

Pgbus.stream("chat", room).broadcast_render(
  renderable: Chat::Message.new(chat_message: msg),
  action: :append, target: "chat-messages-#{room}",
  exclude: connection_id   # composes with #165
)

‘action` defaults to :replace. `target` is required. `exclude:`, `visible_to:`, and `durable:` are forwarded to #broadcast unchanged, so actor-echo suppression and audience filtering compose. Returns whatever #broadcast returns (msg_id, or nil when deferred to after_commit inside a transaction).

See Pgbus::Streams::Renderer for the renderable-resolution order. Components that need URL helpers or a full view context should be rendered by the app (which has the request context) and the resulting string passed as ‘renderable:`.



191
192
193
194
195
196
# File 'lib/pgbus/streams.rb', line 191

def broadcast_render(target:, action: :replace, renderable: nil, visible_to: nil, durable: nil, exclude: nil,
                     event: nil, coalesce: nil)
  html = Renderer.turbo_stream_tag(action: action, target: target, renderable: renderable)
  broadcast(html, visible_to: visible_to, durable: durable, exclude: exclude, event: event,
                  coalesce: coalesce, target: target)
end

#current_msg_idObject



198
199
200
# File 'lib/pgbus/streams.rb', line 198

def current_msg_id
  @client.stream_current_msg_id(@name)
end

#durable?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/pgbus/streams.rb', line 74

def durable?
  @durable
end

#ensure!Object



206
207
208
209
# File 'lib/pgbus/streams.rb', line 206

def ensure!
  ensure_queue!
  self
end

#presenceObject

Returns a Pgbus::Streams::Presence handle for this stream. The Presence object exposes join/leave/touch/members/sweep! for tracking who is currently subscribed. See lib/pgbus/streams/presence.rb for the API.



215
216
217
# File 'lib/pgbus/streams.rb', line 215

def presence
  @presence ||= Presence.new(self)
end

#read_after(after_id:, limit: 500) ⇒ Object



202
203
204
# File 'lib/pgbus/streams.rb', line 202

def read_after(after_id:, limit: 500)
  @client.read_after(@name, after_id: after_id, limit: limit)
end