Class: Feat::SSEParser
- Inherits:
-
Object
- Object
- Feat::SSEParser
- Defined in:
- lib/feat/sse.rb
Overview
Incremental Server-Sent Events parser. Pure: it does no IO.
Feed raw response bytes with #feed; the parser buffers, splits on newline boundaries, and yields one Hash per dispatched event:
{ event: "put", data: "<json>", id: "42" }
Per the SSE wire format:
- "field: value" lines set the event/data/id of the pending event;
a single leading space after the colon is stripped from the value.
- "data:" lines accumulate and are joined with "\n".
- A blank line dispatches the pending event.
- Lines starting with ":" are comments (heartbeats) and are ignored.
Constant Summary collapse
- MAX_EVENT_BYTES =
Upper bound on the bytes held for one in-progress event. Mirrors Client::MAX_DATAFILE_BYTES so the stream path is bounded the same way the poll path is.
10 * 1024 * 1024
Instance Method Summary collapse
-
#feed(chunk) ⇒ Object
Append a chunk of bytes and yield each fully-formed event.
-
#initialize(max_event_bytes: MAX_EVENT_BYTES) ⇒ SSEParser
constructor
A new instance of SSEParser.
Constructor Details
#initialize(max_event_bytes: MAX_EVENT_BYTES) ⇒ SSEParser
Returns a new instance of SSEParser.
26 27 28 29 30 |
# File 'lib/feat/sse.rb', line 26 def initialize(max_event_bytes: MAX_EVENT_BYTES) @max_event_bytes = max_event_bytes @buffer = +"" reset_event end |
Instance Method Details
#feed(chunk) ⇒ Object
Append a chunk of bytes and yield each fully-formed event.
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/feat/sse.rb', line 33 def feed(chunk) @buffer << chunk while (idx = @buffer.index("\n")) line = @buffer.slice!(0, idx + 1) # String#chomp strips a trailing "\r\n", "\n", or "\r". process_line(line.chomp) { |event| yield event } end # A line that never terminates, or a single oversized data field, must # not grow the buffers without bound. Abort once past the cap. guard_size! end |