Module: Pgbus::StreamsHelper

Included in:
Pgbus::Streams::TurboStreamOverride
Defined in:
app/helpers/pgbus/streams_helper.rb

Overview

View helper for subscribing a page to one or more pgbus streams. This is the drop-in replacement for ‘turbo_stream_from` — same API, same streamable resolution (GlobalID objects, symbols, arrays), same signed-name verification path — but the rendered element speaks to `Pgbus::Web::StreamApp` via SSE instead of ActionCable.

The critical difference from ‘turbo_stream_from` is the `since-id` attribute: it carries the current PGMQ `msg_id` watermark at render time so the streamer can replay anything published in the gap between the controller render and the client connecting. This is the fix for rails/rails#52420.

Instance Method Summary collapse

Instance Method Details

#pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) ⇒ Object

Renders a <pgbus-stream-source> custom element. The element’s JS (shipped separately under app/javascript/pgbus/stream_source_element.js) opens an SSE connection to /pgbus/streams/<signed-name>?since=<cursor>, listens for messages, and forwards each turbo-stream HTML payload into Turbo via ‘connectStreamSource`.

The ‘replay:` option controls which messages get delivered on the initial connect:

- :watermark (default) — only broadcasts published after this
  render's moment (the page-born-stale fix). since_id = current_msg_id.
- :all — deliver every message still in PGMQ retention. since_id = 0.
  Useful for chat rooms where the page should show backlog on load.
- N (Integer) — deliver the last N messages. since_id = max(0, current_msg_id - N).
  Useful when the backlog is large and you want a capped history.


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'app/helpers/pgbus/streams_helper.rb', line 32

def pgbus_stream_from(*streamables, replay: :watermark, **html_attributes)
  stream_name = Pgbus::Streams::Stream.name_from(streamables.length == 1 ? streamables.first : streamables)
  signed_name = Pgbus::Streams::SignedName.sign(stream_name)

  since_id = compute_since_id(stream_name, replay)

  attributes = {
    "src" => pgbus_stream_src(signed_name),
    "signed-stream-name" => signed_name,
    "since-id" => since_id.to_s,
    # Compatibility shim: turbo-rails' cable_stream_source_element reads
    # `channel` to decide which ActionCable channel to subscribe to.
    # We emit the same attribute so a page that mistakenly uses the
    # turbo-rails element still renders (with ActionCable semantics).
    "channel" => "Turbo::StreamsChannel"
  }.merge(html_attributes.transform_keys(&:to_s))

  element = render_tag("pgbus-stream-source", attributes)
  script = pgbus_stream_source_script_tag
  return element unless script

  safe_concat(script, element)
end