Module: Pgbus::Streams::Envelope

Defined in:
lib/pgbus/streams/envelope.rb

Overview

Encodes Server-Sent Events frames per html.spec.whatwg.org/multipage/server-sent-events.html.

Pgbus uses three frame types:

- `message(id:, event:, data:)` — a real broadcast (carries an `id:` so the client
  can resume via `Last-Event-ID` on reconnect)
- `comment(text)` — a heartbeat or sentinel that the SSE parser ignores
- `retry_directive(ms)` — tells `EventSource` how long to wait before reconnecting

All frames end with ‘nn` (the SSE event terminator). `data:` lines must not contain newlines — the SSE spec uses `n` as the field terminator, so a multi-line payload would arrive as multiple events. We strip `r` and `n` from data and comment text rather than splitting into multiple `data:` lines, because Turbo Stream HTML is already flat and the simpler encoding is easier to debug.

Constant Summary collapse

NEWLINES =
/[\r\n]+/
RESPONSE_HEADERS =
"HTTP/1.1 200 OK\r\n" \
"content-type: text/event-stream\r\n" \
"cache-control: no-cache, no-transform\r\n" \
"x-accel-buffering: no\r\n" \
"connection: keep-alive\r\n" \
"\r\n"

Class Method Summary collapse

Class Method Details

.comment(text) ⇒ Object



35
36
37
# File 'lib/pgbus/streams/envelope.rb', line 35

def self.comment(text)
  ": #{strip_newlines(text.to_s)}\n\n"
end

.http_response_headersObject



47
48
49
# File 'lib/pgbus/streams/envelope.rb', line 47

def self.http_response_headers
  RESPONSE_HEADERS
end

.message(id:, event:, data:) ⇒ Object

Raises:

  • (ArgumentError)


28
29
30
31
32
33
# File 'lib/pgbus/streams/envelope.rb', line 28

def self.message(id:, event:, data:)
  raise ArgumentError, "id is required" if id.nil?
  raise ArgumentError, "event is required" if event.nil? || event.to_s.empty?

  "id: #{id}\nevent: #{event}\ndata: #{strip_newlines(data.to_s)}\n\n"
end

.retry_directive(milliseconds) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/pgbus/streams/envelope.rb', line 39

def self.retry_directive(milliseconds)
  unless milliseconds.is_a?(Integer) && !milliseconds.negative?
    raise ArgumentError, "retry must be a non-negative integer (got #{milliseconds.inspect})"
  end

  "retry: #{milliseconds}\n\n"
end