Class: Pgbus::Streams::Stream
- Inherits:
-
Object
- Object
- Pgbus::Streams::Stream
- Defined in:
- lib/pgbus/streams.rb
Overview
A handle on a single logical stream. The name can be any string, an object responding to ‘to_gid_param`, or an array of streamables (which are joined with colons — turbo-rails-compatible).
Stream is not a singleton — callers create instances ad hoc, but the underlying queue is created lazily and only once per process per name.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
-
.name_from(streamables) ⇒ Object
Mirrors ‘Turbo::Streams::StreamName#stream_name_from`.
-
.validate_name_length!(name, streamables) ⇒ Object
Enforces the pgbus queue-name budget at the Stream-construction boundary so a forgotten call site fails with an actionable error (pointing at the offending streamables and suggesting ‘Pgbus.stream_key`) instead of an opaque QueueNameValidator failure three frames deep in Client#ensure_stream_queue.
Instance Method Summary collapse
-
#broadcast(payload, visible_to: nil) ⇒ Object
Broadcasts a Turbo Stream HTML payload through the pgbus streamer.
- #current_msg_id ⇒ Object
- #ensure! ⇒ Object
-
#initialize(streamables, client: Pgbus.client) ⇒ Stream
constructor
A new instance of Stream.
-
#presence ⇒ Object
Returns a Pgbus::Streams::Presence handle for this stream.
- #read_after(after_id:, limit: 500) ⇒ Object
Constructor Details
#initialize(streamables, client: Pgbus.client) ⇒ Stream
Returns a new instance of Stream.
40 41 42 43 44 45 46 |
# File 'lib/pgbus/streams.rb', line 40 def initialize(streamables, client: Pgbus.client) @name = self.class.name_from(streamables) self.class.validate_name_length!(@name, streamables) @client = client @ensured = false @ensure_mutex = Mutex.new end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
38 39 40 |
# File 'lib/pgbus/streams.rb', line 38 def name @name end |
Class Method Details
.name_from(streamables) ⇒ Object
Mirrors ‘Turbo::Streams::StreamName#stream_name_from`. Strings pass through; objects with `to_gid_param` or `to_param` are coerced; arrays are joined with `:`. The result is suitable both as a logical stream identifier and as the input to QueueNameValidator (after sanitisation).
109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/pgbus/streams.rb', line 109 def self.name_from(streamables) if streamables.is_a?(Array) streamables.map { |s| name_from(s) }.join(":") elsif streamables.respond_to?(:to_gid_param) streamables.to_gid_param elsif streamables.respond_to?(:to_param) && !streamables.is_a?(Symbol) streamables.to_param else streamables.to_s end end |
.validate_name_length!(name, streamables) ⇒ Object
Enforces the pgbus queue-name budget at the Stream-construction boundary so a forgotten call site fails with an actionable error (pointing at the offending streamables and suggesting ‘Pgbus.stream_key`) instead of an opaque QueueNameValidator failure three frames deep in Client#ensure_stream_queue.
The budget is computed from ‘config.queue_prefix` at call time so apps that override the prefix get the correct limit. Does not mutate the name — silent truncation is a footgun for multi-tenant apps where collisions would mix broadcasts across records. Callers who need a short, safe identifier should use `Pgbus.stream_key(…)` or include `Pgbus::Streams::Streamable` on their ActiveRecord models.
134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/pgbus/streams.rb', line 134 def self.validate_name_length!(name, streamables) budget = Key.queue_name_budget return if name.length <= budget raise StreamNameTooLong, "Stream name #{name.inspect} is #{name.length} chars, " \ "exceeds pgbus budget of #{budget} " \ "(queue_prefix=#{Pgbus.configuration.queue_prefix.inspect}, " \ "pgbus_max_queue_name_length=#{QueueNameValidator::MAX_QUEUE_NAME_LENGTH}). " \ "Streamables: #{streamables.inspect}. " \ "Use Pgbus.stream_key(*streamables) to produce a safe short name, " \ "or include Pgbus::Streams::Streamable on the model." end |
Instance Method Details
#broadcast(payload, visible_to: nil) ⇒ Object
Broadcasts a Turbo Stream HTML payload through the pgbus streamer. PGMQ’s ‘message` column is JSONB, so raw HTML strings can’t be passed directly. We wrap as ‘“…”` on the way in and unwrap in Pgbus::Web::Streamer::StreamEventDispatcher before delivering to the SSE client. Callers pass a plain HTML string; the wrapping is an implementation detail.
Transactional semantics: if this call is made inside an open ActiveRecord transaction, the PGMQ insert is deferred to an after_commit callback. If the transaction rolls back, the broadcast silently drops — clients never see the change that the database never persisted. This is the feature no other Rails real-time stack (including turbo-rails over ActionCable) can offer: the broadcast and the data mutation are atomic with respect to each other. Returns the assigned msg_id when sent synchronously, nil when deferred (the id isn’t known until the after_commit callback runs).
Audience filtering: pass ‘visible_to:` with a filter label (a Symbol previously registered via Pgbus::Streams.filters.register) to restrict delivery to connections whose authorize-hook context satisfies the predicate. The label travels with the broadcast through PGMQ; the predicate itself lives in-process on the subscriber side and is evaluated per-connection by the Dispatcher.
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/pgbus/streams.rb', line 71 def broadcast(payload, visible_to: nil) ensure_queue! wrapped = { "html" => payload.to_s } wrapped["visible_to"] = visible_to.to_s if visible_to transaction = current_open_transaction if transaction transaction.after_commit { @client.(@name, wrapped) } nil else @client.(@name, wrapped) end end |
#current_msg_id ⇒ Object
84 85 86 |
# File 'lib/pgbus/streams.rb', line 84 def current_msg_id @client.stream_current_msg_id(@name) end |
#ensure! ⇒ Object
92 93 94 95 |
# File 'lib/pgbus/streams.rb', line 92 def ensure! ensure_queue! self end |
#presence ⇒ Object
Returns a Pgbus::Streams::Presence handle for this stream. The Presence object exposes join/leave/touch/members/sweep! for tracking who is currently subscribed. See lib/pgbus/streams/presence.rb for the API.
101 102 103 |
# File 'lib/pgbus/streams.rb', line 101 def presence @presence ||= Presence.new(self) end |
#read_after(after_id:, limit: 500) ⇒ Object
88 89 90 |
# File 'lib/pgbus/streams.rb', line 88 def read_after(after_id:, limit: 500) @client.read_after(@name, after_id: after_id, limit: limit) end |