Module: Pgbus::Streams::TurboBroadcastable
- Defined in:
- lib/pgbus/streams/turbo_broadcastable.rb
Overview
Runtime patch that redirects Turbo::StreamsChannel.broadcast_stream_to
through pgbus instead of ActionCable.server.broadcast. Applied at
Rails engine boot time when defined?(::Turbo::StreamsChannel) —
see Pgbus::Engine's initializer. When turbo-rails isn't loaded,
this patch is a no-op and pgbus streams continue to work via the
explicit Pgbus.stream(...).broadcast(...) API.
After the patch:
class Order < ApplicationRecord
broadcasts_to :account # existing turbo-rails API, unchanged
end
# In a controller:
@order.update!(status: "shipped")
# → Turbo::Broadcastable runs its after_update_commit callback
# → calls Turbo::StreamsChannel.broadcast_replace_to
# → which calls Turbo::StreamsChannel.broadcast_stream_to
# → which is patched to call Pgbus.stream(name).broadcast(content)
# → which inserts into PGMQ and fires NOTIFY
Zero changes to user code. The entire Turbo::Broadcastable API (broadcasts_to, broadcasts_refreshes, broadcast_replace_to, broadcast_append_later_to, broadcasts_refreshes_to, etc) reuses this code path because they all funnel through broadcast_stream_to.
Signed stream name reuse: we don't touch Turbo.signed_stream_verifier,
so any existing broadcasts_to :room call continues to generate
tokens that our Pgbus::Streams::SignedName.verify! accepts (as
long as Turbo.signed_stream_verifier_key is set, which the Rails
app is already responsible for).
Instance Method Summary collapse
Instance Method Details
#broadcast_stream_to(*streamables, content:) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pgbus/streams/turbo_broadcastable.rb', line 37 def broadcast_stream_to(*streamables, content:) name = stream_name_from(streamables) override = Thread.current[:pgbus_broadcast_durable] durable = if override.nil? Pgbus.configuration.streams_default_broadcast_mode == :durable else override end Pgbus.stream(name, durable: durable).broadcast( content, exclude: Thread.current[:pgbus_broadcast_exclude], visible_to: Thread.current[:pgbus_broadcast_visible_to], event: Thread.current[:pgbus_broadcast_event] ) end |