Module: Pgbus::Streams::Envelope
- Defined in:
- lib/pgbus/streams/envelope.rb
Overview
Encodes Server-Sent Events frames per html.spec.whatwg.org/multipage/server-sent-events.html.
Pgbus uses three frame types:
- `message(id:, event:, data:)` — a real broadcast (carries an `id:` so the client
can resume via `Last-Event-ID` on reconnect)
- `comment(text)` — a heartbeat or sentinel that the SSE parser ignores
- `retry_directive(ms)` — tells `EventSource` how long to wait before reconnecting
All frames end with ‘nn` (the SSE event terminator). `data:` lines must not contain newlines — the SSE spec uses `n` as the field terminator, so a multi-line payload would arrive as multiple events. We strip `r` and `n` from data and comment text rather than splitting into multiple `data:` lines, because Turbo Stream HTML is already flat and the simpler encoding is easier to debug.
Constant Summary collapse
- NEWLINES =
/[\r\n]+/- RESPONSE_HEADERS =
"HTTP/1.1 200 OK\r\n" \ "content-type: text/event-stream\r\n" \ "cache-control: no-cache, no-transform\r\n" \ "x-accel-buffering: no\r\n" \ "connection: keep-alive\r\n" \ "\r\n"
Class Method Summary collapse
- .comment(text) ⇒ Object
- .http_response_headers ⇒ Object
- .message(id:, event:, data:) ⇒ Object
- .retry_directive(milliseconds) ⇒ Object
Class Method Details
.comment(text) ⇒ Object
35 36 37 |
# File 'lib/pgbus/streams/envelope.rb', line 35 def self.comment(text) ": #{strip_newlines(text.to_s)}\n\n" end |
.http_response_headers ⇒ Object
47 48 49 |
# File 'lib/pgbus/streams/envelope.rb', line 47 def self.http_response_headers RESPONSE_HEADERS end |
.message(id:, event:, data:) ⇒ Object
28 29 30 31 32 33 |
# File 'lib/pgbus/streams/envelope.rb', line 28 def self.(id:, event:, data:) raise ArgumentError, "id is required" if id.nil? raise ArgumentError, "event is required" if event.nil? || event.to_s.empty? "id: #{id}\nevent: #{event}\ndata: #{strip_newlines(data.to_s)}\n\n" end |
.retry_directive(milliseconds) ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/pgbus/streams/envelope.rb', line 39 def self.retry_directive(milliseconds) unless milliseconds.is_a?(Integer) && !milliseconds.negative? raise ArgumentError, "retry must be a non-negative integer (got #{milliseconds.inspect})" end "retry: #{milliseconds}\n\n" end |