Module: Pgbus::Streams::Envelope
- Defined in:
- lib/pgbus/streams/envelope.rb
Overview
Encodes Server-Sent Events frames per html.spec.whatwg.org/multipage/server-sent-events.html.
Pgbus uses three frame types:
- `message(id:, event:, data:)` — a real broadcast (carries an `id:` so the client
can resume via `Last-Event-ID` on reconnect)
- `comment(text)` — a heartbeat or sentinel that the SSE parser ignores
- `retry_directive(ms)` — tells `EventSource` how long to wait before reconnecting
All frames end with ‘nn` (the SSE event terminator). `data:` lines must not contain newlines — the SSE spec uses `n` as the field terminator, so a multi-line payload would arrive as multiple events. We strip `r` and `n` from data and comment text rather than splitting into multiple `data:` lines, because Turbo Stream HTML is already flat and the simpler encoding is easier to debug.
Constant Summary collapse
- NEWLINES =
/[\r\n]+/- RESPONSE_HEADERS =
"HTTP/1.1 200 OK\r\n" \ "content-type: text/event-stream\r\n" \ "cache-control: no-cache, no-transform\r\n" \ "x-accel-buffering: no\r\n" \ "connection: keep-alive\r\n" \ "\r\n"
Class Method Summary collapse
- .comment(text) ⇒ Object
-
.connected(id:) ⇒ Object
Emits a ‘pgbus:connected` frame carrying the server-minted connection id as JSON.
- .http_response_headers ⇒ Object
- .message(id:, event:, data:) ⇒ Object
- .retry_directive(milliseconds) ⇒ Object
Class Method Details
.comment(text) ⇒ Object
41 42 43 |
# File 'lib/pgbus/streams/envelope.rb', line 41 def self.comment(text) ": #{strip_newlines(text.to_s)}\n\n" end |
.connected(id:) ⇒ Object
Emits a ‘pgbus:connected` frame carrying the server-minted connection id as JSON. Sent once, right after the open handshake, so the page can read its own connection id and send it back as `X-Pgbus-Connection` on action requests (actor-echo suppression, issue #165). Deliberately omits an `id:` line: this is connection metadata, not a broadcast, and giving it a cursor id would corrupt the client’s Last-Event-ID replay position on reconnect.
52 53 54 55 56 |
# File 'lib/pgbus/streams/envelope.rb', line 52 def self.connected(id:) raise ArgumentError, "id is required" if id.nil? || id.to_s.empty? "event: pgbus:connected\ndata: #{JSON.generate({ connectionId: id.to_s })}\n\n" end |
.http_response_headers ⇒ Object
66 67 68 |
# File 'lib/pgbus/streams/envelope.rb', line 66 def self.http_response_headers RESPONSE_HEADERS end |
.message(id:, event:, data:) ⇒ Object
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/pgbus/streams/envelope.rb', line 30 def self.(id:, event:, data:) raise ArgumentError, "id is required" if id.nil? raise ArgumentError, "event is required" if event.nil? || event.to_s.empty? # Strip newlines from BOTH event and data, not just data: each is # interpolated into its own SSE field line, so an unescaped \r/\n in # either would terminate the field early and let a crafted value # inject extra SSE fields (a forged id:/data:) into the frame. "id: #{id}\nevent: #{strip_newlines(event.to_s)}\ndata: #{strip_newlines(data.to_s)}\n\n" end |
.retry_directive(milliseconds) ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/pgbus/streams/envelope.rb', line 58 def self.retry_directive(milliseconds) unless milliseconds.is_a?(Integer) && !milliseconds.negative? raise ArgumentError, "retry must be a non-negative integer (got #{milliseconds.inspect})" end "retry: #{milliseconds}\n\n" end |