Module: Pgbus::Streams

Defined in:
lib/pgbus/streams.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/streams/renderer.rb,
lib/pgbus/streams/coalescer.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/streams/signed_name.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
lib/pgbus/streams/turbo_stream_override.rb,
lib/pgbus/streams/broadcastable_override.rb,
lib/pgbus/streams/watermark_cache_middleware.rb

Overview

Pgbus::Streams is the SSE-based pub/sub subsystem that replaces ‘turbo_stream_from`. The user-facing entrypoint is `Pgbus.stream(name)`, which returns a `Pgbus::Streams::Stream` providing `#broadcast`, `#current_msg_id`, and `#read_after`.

Defined Under Namespace

Modules: BroadcastableOverride, Cursor, Envelope, Key, Renderer, SignedName, Streamable, TurboBroadcastable, TurboStreamOverride Classes: Coalescer, Filters, Presence, Stream, StreamNameTooLong, WatermarkCacheMiddleware

Constant Summary collapse

DEFAULT_SSE_EVENT =

The default SSE ‘event:` name for a broadcast frame. Turbo’s StreamObserver consumes frames the client re-dispatches as the ‘message` DOM event; the client maps this SSE event name to `message`. Broadcasts may override it with a typed name (issue #170).

"turbo-stream"

Class Method Summary collapse

Class Method Details

.coalescerObject

Process-wide publish-side coalescer for high-frequency broadcasts (issue #171). Lazily built; the flush re-enters the normal broadcast path so a coalesced frame is just a deferred ordinary broadcast.



40
41
42
43
44
45
46
47
48
49
# File 'lib/pgbus/streams.rb', line 40

def self.coalescer
  # `target:` is part of the flush signature but unused here — it's only
  # a coalescing key; the target is already encoded in the payload's
  # turbo-stream tag. Absorbed via ** so it isn't a broadcast argument.
  @coalescer ||= Coalescer.new(
    flush: lambda do |stream_name:, payload:, opts:, **|
      Pgbus.stream(stream_name).broadcast(payload, **opts)
    end
  )
end

.filtersObject

Process-wide registry of server-side audience filter predicates. Register filters at boot time via:

Pgbus::Streams.filters.register(:admin_only) { |user| user.admin? }

See lib/pgbus/streams/filters.rb for the full API.



28
29
30
# File 'lib/pgbus/streams.rb', line 28

def self.filters
  @filters ||= Filters.new
end

.install_turbo_broadcastable_patch!Object

Apply the patch to Turbo::StreamsChannel’s singleton class. Idempotent: prepending the same module twice is a no-op. Called from Pgbus::Engine’s initializer when Turbo is detected.



52
53
54
55
56
57
# File 'lib/pgbus/streams/turbo_broadcastable.rb', line 52

def self.install_turbo_broadcastable_patch!
  return unless defined?(::Turbo::StreamsChannel)
  return if ::Turbo::StreamsChannel.singleton_class.include?(TurboBroadcastable)

  ::Turbo::StreamsChannel.singleton_class.prepend(TurboBroadcastable)
end

.install_turbo_stream_override!Object

Apply the patch to Turbo::StreamsHelper. Idempotent: prepending the same module twice is a no-op. Called from Pgbus::Engine’s initializer when both turbo-rails and pgbus streams are enabled.



43
44
45
46
47
48
# File 'lib/pgbus/streams/turbo_stream_override.rb', line 43

def self.install_turbo_stream_override!
  return unless defined?(::Turbo::StreamsHelper)
  return if ::Turbo::StreamsHelper.ancestors.include?(TurboStreamOverride)

  ::Turbo::StreamsHelper.prepend(TurboStreamOverride)
end

.reset_coalescer!Object

Clears the coalescer. Used by tests; not intended for runtime.



52
53
54
# File 'lib/pgbus/streams.rb', line 52

def self.reset_coalescer!
  @coalescer = nil
end

.reset_filters!Object

Clears the filters registry. Used by tests; not intended for runtime.



33
34
35
# File 'lib/pgbus/streams.rb', line 33

def self.reset_filters!
  @filters = nil
end