Module: Pgbus::Streams
- Defined in:
- lib/pgbus/streams.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/streams/renderer.rb,
lib/pgbus/streams/coalescer.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/streams/signed_name.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
lib/pgbus/streams/turbo_stream_override.rb,
lib/pgbus/streams/broadcastable_override.rb,
lib/pgbus/streams/watermark_cache_middleware.rb
Overview
Pgbus::Streams is the SSE-based pub/sub subsystem that replaces ‘turbo_stream_from`. The user-facing entrypoint is `Pgbus.stream(name)`, which returns a `Pgbus::Streams::Stream` providing `#broadcast`, `#current_msg_id`, and `#read_after`.
Defined Under Namespace
Modules: BroadcastableOverride, Cursor, Envelope, Key, Renderer, SignedName, Streamable, TurboBroadcastable, TurboStreamOverride Classes: Coalescer, Filters, Presence, Stream, StreamNameTooLong, WatermarkCacheMiddleware
Constant Summary collapse
- DEFAULT_SSE_EVENT =
The default SSE ‘event:` name for a broadcast frame. Turbo’s StreamObserver consumes frames the client re-dispatches as the ‘message` DOM event; the client maps this SSE event name to `message`. Broadcasts may override it with a typed name (issue #170).
"turbo-stream"
Class Method Summary collapse
-
.coalescer ⇒ Object
Process-wide publish-side coalescer for high-frequency broadcasts (issue #171).
-
.filters ⇒ Object
Process-wide registry of server-side audience filter predicates.
-
.install_turbo_broadcastable_patch! ⇒ Object
Apply the patch to Turbo::StreamsChannel’s singleton class.
-
.install_turbo_stream_override! ⇒ Object
Apply the patch to Turbo::StreamsHelper.
-
.reset_coalescer! ⇒ Object
Clears the coalescer.
-
.reset_filters! ⇒ Object
Clears the filters registry.
Class Method Details
.coalescer ⇒ Object
Process-wide publish-side coalescer for high-frequency broadcasts (issue #171). Lazily built; the flush re-enters the normal broadcast path so a coalesced frame is just a deferred ordinary broadcast.
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/pgbus/streams.rb', line 40 def self.coalescer # `target:` is part of the flush signature but unused here — it's only # a coalescing key; the target is already encoded in the payload's # turbo-stream tag. Absorbed via ** so it isn't a broadcast argument. @coalescer ||= Coalescer.new( flush: lambda do |stream_name:, payload:, opts:, **| Pgbus.stream(stream_name).broadcast(payload, **opts) end ) end |
.filters ⇒ Object
28 29 30 |
# File 'lib/pgbus/streams.rb', line 28 def self.filters @filters ||= Filters.new end |
.install_turbo_broadcastable_patch! ⇒ Object
Apply the patch to Turbo::StreamsChannel’s singleton class. Idempotent: prepending the same module twice is a no-op. Called from Pgbus::Engine’s initializer when Turbo is detected.
52 53 54 55 56 57 |
# File 'lib/pgbus/streams/turbo_broadcastable.rb', line 52 def self.install_turbo_broadcastable_patch! return unless defined?(::Turbo::StreamsChannel) return if ::Turbo::StreamsChannel.singleton_class.include?(TurboBroadcastable) ::Turbo::StreamsChannel.singleton_class.prepend(TurboBroadcastable) end |
.install_turbo_stream_override! ⇒ Object
Apply the patch to Turbo::StreamsHelper. Idempotent: prepending the same module twice is a no-op. Called from Pgbus::Engine’s initializer when both turbo-rails and pgbus streams are enabled.
43 44 45 46 47 48 |
# File 'lib/pgbus/streams/turbo_stream_override.rb', line 43 def self.install_turbo_stream_override! return unless defined?(::Turbo::StreamsHelper) return if ::Turbo::StreamsHelper.ancestors.include?(TurboStreamOverride) ::Turbo::StreamsHelper.prepend(TurboStreamOverride) end |
.reset_coalescer! ⇒ Object
Clears the coalescer. Used by tests; not intended for runtime.
52 53 54 |
# File 'lib/pgbus/streams.rb', line 52 def self.reset_coalescer! @coalescer = nil end |
.reset_filters! ⇒ Object
Clears the filters registry. Used by tests; not intended for runtime.
33 34 35 |
# File 'lib/pgbus/streams.rb', line 33 def self.reset_filters! @filters = nil end |