Class: Pgbus::Streams::Stream
- Inherits:
-
Object
- Object
- Pgbus::Streams::Stream
- Defined in:
- lib/pgbus/streams.rb
Overview
A handle on a single logical stream. The name can be any string, an object responding to ‘to_gid_param`, or an array of streamables (which are joined with colons — turbo-rails-compatible).
Stream is not a singleton — callers create instances ad hoc, but the underlying queue is created lazily and only once per process per name.
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
-
.name_from(streamables) ⇒ Object
Mirrors ‘Turbo::Streams::StreamName#stream_name_from`.
-
.validate_name_length!(name, streamables) ⇒ Object
Enforces the pgbus queue-name budget at the Stream-construction boundary so a forgotten call site fails with an actionable error (pointing at the offending streamables and suggesting ‘Pgbus.stream_key`) instead of an opaque QueueNameValidator failure three frames deep in Client#ensure_stream_queue.
Instance Method Summary collapse
-
#broadcast(payload, visible_to: nil, durable: nil) ⇒ Object
Broadcasts a Turbo Stream HTML payload through the pgbus streamer.
- #current_msg_id ⇒ Object
- #durable? ⇒ Boolean
- #ensure! ⇒ Object
-
#initialize(streamables, client: Pgbus.client, durable: true) ⇒ Stream
constructor
A new instance of Stream.
-
#presence ⇒ Object
Returns a Pgbus::Streams::Presence handle for this stream.
- #read_after(after_id:, limit: 500) ⇒ Object
Constructor Details
#initialize(streamables, client: Pgbus.client, durable: true) ⇒ Stream
Returns a new instance of Stream.
40 41 42 43 44 45 46 47 |
# File 'lib/pgbus/streams.rb', line 40 def initialize(streamables, client: Pgbus.client, durable: true) @name = self.class.name_from(streamables) self.class.validate_name_length!(@name, streamables) @client = client @durable = durable @ensured = false @ensure_mutex = Mutex.new end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
38 39 40 |
# File 'lib/pgbus/streams.rb', line 38 def name @name end |
Class Method Details
.name_from(streamables) ⇒ Object
Mirrors ‘Turbo::Streams::StreamName#stream_name_from`. Strings pass through; objects with `to_gid_param` or `to_param` are coerced; arrays are joined with `:`. The result is suitable both as a logical stream identifier and as the input to QueueNameValidator (after sanitisation).
129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/pgbus/streams.rb', line 129 def self.name_from(streamables) if streamables.is_a?(Array) streamables.map { |s| name_from(s) }.join(":") elsif streamables.respond_to?(:to_gid_param) streamables.to_gid_param elsif streamables.respond_to?(:to_param) && !streamables.is_a?(Symbol) streamables.to_param else streamables.to_s end end |
.validate_name_length!(name, streamables) ⇒ Object
Enforces the pgbus queue-name budget at the Stream-construction boundary so a forgotten call site fails with an actionable error (pointing at the offending streamables and suggesting ‘Pgbus.stream_key`) instead of an opaque QueueNameValidator failure three frames deep in Client#ensure_stream_queue.
The budget is computed from ‘config.queue_prefix` at call time so apps that override the prefix get the correct limit. Does not mutate the name — silent truncation is a footgun for multi-tenant apps where collisions would mix broadcasts across records. Callers who need a short, safe identifier should use `Pgbus.stream_key(…)` or include `Pgbus::Streams::Streamable` on their ActiveRecord models.
154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/pgbus/streams.rb', line 154 def self.validate_name_length!(name, streamables) budget = Key.queue_name_budget return if name.length <= budget raise StreamNameTooLong, "Stream name #{name.inspect} is #{name.length} chars, " \ "exceeds pgbus budget of #{budget} " \ "(queue_prefix=#{Pgbus.configuration.queue_prefix.inspect}, " \ "pgbus_max_queue_name_length=#{QueueNameValidator::MAX_QUEUE_NAME_LENGTH}). " \ "Streamables: #{streamables.inspect}. " \ "Use Pgbus.stream_key(*streamables) to produce a safe short name, " \ "or include Pgbus::Streams::Streamable on the model." end |
Instance Method Details
#broadcast(payload, visible_to: nil, durable: nil) ⇒ Object
Broadcasts a Turbo Stream HTML payload through the pgbus streamer. PGMQ’s ‘message` column is JSONB, so raw HTML strings can’t be passed directly. We wrap as ‘“…”` on the way in and unwrap in Pgbus::Web::Streamer::StreamEventDispatcher before delivering to the SSE client. Callers pass a plain HTML string; the wrapping is an implementation detail.
Transactional semantics: if this call is made inside an open ActiveRecord transaction, the PGMQ insert is deferred to an after_commit callback. If the transaction rolls back, the broadcast silently drops — clients never see the change that the database never persisted. This is the feature no other Rails real-time stack (including turbo-rails over ActionCable) can offer: the broadcast and the data mutation are atomic with respect to each other. Returns the assigned msg_id when sent synchronously, nil when deferred (the id isn’t known until the after_commit callback runs).
Audience filtering: pass ‘visible_to:` with a filter label (a Symbol previously registered via Pgbus::Streams.filters.register) to restrict delivery to connections whose authorize-hook context satisfies the predicate. The label travels with the broadcast through PGMQ; the predicate itself lives in-process on the subscriber side and is evaluated per-connection by the Dispatcher. Per-broadcast `durable:` overrides the stream-level default for a single broadcast. `nil` (the default) defers to the stream’s own ‘durable?` setting; `true`/`false` flip the mode for this call only.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/pgbus/streams.rb', line 79 def broadcast(payload, visible_to: nil, durable: nil) wrapped = { "html" => payload.to_s } wrapped["visible_to"] = visible_to.to_s if visible_to use_durable = durable.nil? ? @durable : durable return broadcast_ephemeral(wrapped) unless use_durable ensure_queue! transaction = current_open_transaction instrument_payload = { stream: @name, visible_to: visible_to, deferred: !transaction.nil?, bytes: wrapped["html"].bytesize } Instrumentation.instrument("pgbus.stream.broadcast", instrument_payload) do if transaction transaction.after_commit { @client.(@name, wrapped) } nil else @client.(@name, wrapped) end end end |
#current_msg_id ⇒ Object
104 105 106 |
# File 'lib/pgbus/streams.rb', line 104 def current_msg_id @client.stream_current_msg_id(@name) end |
#durable? ⇒ Boolean
49 50 51 |
# File 'lib/pgbus/streams.rb', line 49 def durable? @durable end |
#ensure! ⇒ Object
112 113 114 115 |
# File 'lib/pgbus/streams.rb', line 112 def ensure! ensure_queue! self end |
#presence ⇒ Object
Returns a Pgbus::Streams::Presence handle for this stream. The Presence object exposes join/leave/touch/members/sweep! for tracking who is currently subscribed. See lib/pgbus/streams/presence.rb for the API.
121 122 123 |
# File 'lib/pgbus/streams.rb', line 121 def presence @presence ||= Presence.new(self) end |
#read_after(after_id:, limit: 500) ⇒ Object
108 109 110 |
# File 'lib/pgbus/streams.rb', line 108 def read_after(after_id:, limit: 500) @client.read_after(@name, after_id: after_id, limit: limit) end |