Module: Pgbus::StreamsHelper

Included in:
Pgbus::Streams::TurboStreamOverride
Defined in:
app/helpers/pgbus/streams_helper.rb

Overview

View helper for subscribing a page to one or more pgbus streams. This is the drop-in replacement for turbo_stream_from — same API, same streamable resolution (GlobalID objects, symbols, arrays), same signed-name verification path — but the rendered element speaks to Pgbus::Web::StreamApp via SSE instead of ActionCable.

The critical difference from turbo_stream_from is the since-id attribute: it carries the current PGMQ msg_id watermark at render time so the streamer can replay anything published in the gap between the controller render and the client connecting. This is the fix for rails/rails#52420.

Instance Method Summary collapse

Instance Method Details

#pgbus_stream_from(*streamables, replay: :watermark, **html_attributes) ⇒ Object

Renders a custom element. The element's JS (shipped separately under app/javascript/pgbus/stream_source_element.js) opens an SSE connection to /pgbus/streams/?since=, listens for messages, and forwards each turbo-stream HTML payload into Turbo via connectStreamSource.

The replay: option controls which messages get delivered on the initial connect:

- :watermark (default) — only broadcasts published after this
render's moment (the page-born-stale fix). since_id = current_msg_id.
- :all — deliver every message still in PGMQ retention. since_id = 0.
Useful for chat rooms where the page should show backlog on load.
- N (Integer) — deliver the last N messages. since_id = max(0, current_msg_id - N).
Useful when the backlog is large and you want a capped history.


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'app/helpers/pgbus/streams_helper.rb', line 32

def pgbus_stream_from(*streamables, replay: :watermark, **html_attributes)
  stream_name = Pgbus::Streams::Stream.name_from(streamables.length == 1 ? streamables.first : streamables)
  signed_name = Pgbus::Streams::SignedName.sign(stream_name)

  since_id = compute_since_id(stream_name, replay)

  attributes = {
    "src" => pgbus_stream_src(signed_name),
    "signed-stream-name" => signed_name,
    "since-id" => since_id.to_s,
    # Compatibility shim: turbo-rails' cable_stream_source_element reads
    # `channel` to decide which ActionCable channel to subscribe to.
    # We emit the same attribute so a page that mistakenly uses the
    # turbo-rails element still renders (with ActionCable semantics).
    "channel" => "Turbo::StreamsChannel"
  }.merge(html_attributes.transform_keys(&:to_s))

  element = render_tag("pgbus-stream-source", attributes)
  script = pgbus_stream_source_script_tag
  return element unless script

  safe_concat(script, element)
end