Module: Pgbus::StreamsHelper
- Included in:
- Pgbus::Streams::TurboStreamOverride
- Defined in:
- app/helpers/pgbus/streams_helper.rb
Overview
View helper for subscribing a page to one or more pgbus streams. This is the drop-in replacement for ‘turbo_stream_from` — same API, same streamable resolution (GlobalID objects, symbols, arrays), same signed-name verification path — but the rendered element speaks to `Pgbus::Web::StreamApp` via SSE instead of ActionCable.
The critical difference from ‘turbo_stream_from` is the `since-id` attribute: it carries the current PGMQ `msg_id` watermark at render time so the streamer can replay anything published in the gap between the controller render and the client connecting. This is the fix for rails/rails#52420.
Instance Method Summary collapse
-
#pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) ⇒ Object
Renders a <pgbus-stream-source> custom element.
Instance Method Details
#pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) ⇒ Object
Renders a <pgbus-stream-source> custom element. The element’s JS (shipped separately under app/javascript/pgbus/stream_source_element.js) opens an SSE connection to /pgbus/streams/<signed-name>?since=<cursor>, listens for messages, and forwards each turbo-stream HTML payload into Turbo via ‘connectStreamSource`.
The ‘replay:` option controls which messages get delivered on the initial connect:
- :watermark (default) — only broadcasts published after this
render's moment (the page-born-stale fix). since_id = current_msg_id.
- :all — deliver every message still in PGMQ retention. since_id = 0.
Useful for chat rooms where the page should show backlog on load.
- N (Integer) — deliver the last N messages. since_id = max(0, current_msg_id - N).
Useful when the backlog is large and you want a capped history.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'app/helpers/pgbus/streams_helper.rb', line 32 def pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) stream_name = Pgbus::Streams::Stream.name_from(streamables.length == 1 ? streamables.first : streamables) signed_name = Pgbus::Streams::SignedName.sign(stream_name) since_id = compute_since_id(stream_name, replay) attributes = { "src" => pgbus_stream_src(signed_name), "signed-stream-name" => signed_name, "since-id" => since_id.to_s, # Compatibility shim: turbo-rails' cable_stream_source_element reads # `channel` to decide which ActionCable channel to subscribe to. # We emit the same attribute so a page that mistakenly uses the # turbo-rails element still renders (with ActionCable semantics). "channel" => "Turbo::StreamsChannel" }.merge(html_attributes.transform_keys(&:to_s)) element = render_tag("pgbus-stream-source", attributes) script = pgbus_stream_source_script_tag return element unless script safe_concat(script, element) end |