Module: Pgbus::Streams

Defined in:
lib/pgbus/streams.rb,
lib/pgbus/streams/key.rb,
lib/pgbus/streams/cursor.rb,
lib/pgbus/streams/filters.rb,
lib/pgbus/streams/envelope.rb,
lib/pgbus/streams/presence.rb,
lib/pgbus/streams/streamable.rb,
lib/pgbus/streams/signed_name.rb,
lib/pgbus/streams/turbo_broadcastable.rb,
lib/pgbus/streams/turbo_stream_override.rb,
lib/pgbus/streams/watermark_cache_middleware.rb

Overview

Pgbus::Streams is the SSE-based pub/sub subsystem that replaces ‘turbo_stream_from`. The user-facing entrypoint is `Pgbus.stream(name)`, which returns a `Pgbus::Streams::Stream` providing `#broadcast`, `#current_msg_id`, and `#read_after`.

Defined Under Namespace

Modules: Cursor, Envelope, Key, SignedName, Streamable, TurboBroadcastable, TurboStreamOverride Classes: Filters, Presence, Stream, StreamNameTooLong, WatermarkCacheMiddleware

Class Method Summary collapse

Class Method Details

.filtersObject

Process-wide registry of server-side audience filter predicates. Register filters at boot time via:

Pgbus::Streams.filters.register(:admin_only) { |user| user.admin? }

See lib/pgbus/streams/filters.rb for the full API.



22
23
24
# File 'lib/pgbus/streams.rb', line 22

def self.filters
  @filters ||= Filters.new
end

.install_turbo_broadcastable_patch!Object

Apply the patch to Turbo::StreamsChannel’s singleton class. Idempotent: prepending the same module twice is a no-op. Called from Pgbus::Engine’s initializer when Turbo is detected.



46
47
48
49
50
51
# File 'lib/pgbus/streams/turbo_broadcastable.rb', line 46

def self.install_turbo_broadcastable_patch!
  return unless defined?(::Turbo::StreamsChannel)
  return if ::Turbo::StreamsChannel.singleton_class.include?(TurboBroadcastable)

  ::Turbo::StreamsChannel.singleton_class.prepend(TurboBroadcastable)
end

.install_turbo_stream_override!Object

Apply the patch to Turbo::StreamsHelper. Idempotent: prepending the same module twice is a no-op. Called from Pgbus::Engine’s initializer when both turbo-rails and pgbus streams are enabled.



43
44
45
46
47
48
# File 'lib/pgbus/streams/turbo_stream_override.rb', line 43

def self.install_turbo_stream_override!
  return unless defined?(::Turbo::StreamsHelper)
  return if ::Turbo::StreamsHelper.ancestors.include?(TurboStreamOverride)

  ::Turbo::StreamsHelper.prepend(TurboStreamOverride)
end

.reset_filters!Object

Clears the filters registry. Used by tests; not intended for runtime.



27
28
29
# File 'lib/pgbus/streams.rb', line 27

def self.reset_filters!
  @filters = nil
end