Module: Pgbus::StreamsHelper
- Included in:
- Pgbus::Streams::TurboStreamOverride
- Defined in:
- app/helpers/pgbus/streams_helper.rb
Overview
View helper for subscribing a page to one or more pgbus streams. This
is the drop-in replacement for turbo_stream_from — same API, same
streamable resolution (GlobalID objects, symbols, arrays), same
signed-name verification path — but the rendered element speaks to
Pgbus::Web::StreamApp via SSE instead of ActionCable.
The critical difference from turbo_stream_from is the since-id
attribute: it carries the current PGMQ msg_id watermark at render
time so the streamer can replay anything published in the gap between
the controller render and the client connecting. This is the fix for
rails/rails#52420.
Instance Method Summary collapse
-
#pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) ⇒ Object
Renders a
custom element.
Instance Method Details
#pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) ⇒ Object
Renders a connectStreamSource.
The replay: option controls which messages get delivered on the
initial connect:
- :watermark (default) — only broadcasts published after this
render's moment (the page-born-stale fix). since_id = current_msg_id.
- :all — deliver every message still in PGMQ retention. since_id = 0.
Useful for chat rooms where the page should show backlog on load.
- N (Integer) — deliver the last N messages. since_id = max(0, current_msg_id - N).
Useful when the backlog is large and you want a capped history.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'app/helpers/pgbus/streams_helper.rb', line 32 def pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) stream_name = Pgbus::Streams::Stream.name_from(streamables.length == 1 ? streamables.first : streamables) signed_name = Pgbus::Streams::SignedName.sign(stream_name) since_id = compute_since_id(stream_name, replay) attributes = { "src" => pgbus_stream_src(signed_name), "signed-stream-name" => signed_name, "since-id" => since_id.to_s, # Compatibility shim: turbo-rails' cable_stream_source_element reads # `channel` to decide which ActionCable channel to subscribe to. # We emit the same attribute so a page that mistakenly uses the # turbo-rails element still renders (with ActionCable semantics). "channel" => "Turbo::StreamsChannel" }.merge(html_attributes.transform_keys(&:to_s)) element = render_tag("pgbus-stream-source", attributes) script = pgbus_stream_source_script_tag return element unless script safe_concat(script, element) end |