Class: Feat::SSEParser

Inherits:
Object
  • Object
show all
Defined in:
lib/feat/sse.rb

Overview

Incremental Server-Sent Events parser. Pure: it does no IO.

Feed raw response bytes with #feed; the parser buffers, splits on newline boundaries, and yields one Hash per dispatched event:

{ event: "put", data: "<json>", id: "42" }

Per the SSE wire format:

- "field: value" lines set the event/data/id of the pending event;
  a single leading space after the colon is stripped from the value.
- "data:" lines accumulate and are joined with "\n".
- A blank line dispatches the pending event.
- Lines starting with ":" are comments (heartbeats) and are ignored.

Constant Summary collapse

MAX_EVENT_BYTES =

Upper bound on the bytes held for one in-progress event. Mirrors Client::MAX_DATAFILE_BYTES so the stream path is bounded the same way the poll path is.

10 * 1024 * 1024

Instance Method Summary collapse

Constructor Details

#initialize(max_event_bytes: MAX_EVENT_BYTES) ⇒ SSEParser

Returns a new instance of SSEParser.



26
27
28
29
30
# File 'lib/feat/sse.rb', line 26

def initialize(max_event_bytes: MAX_EVENT_BYTES)
  @max_event_bytes = max_event_bytes
  @buffer = +""
  reset_event
end

Instance Method Details

#feed(chunk) ⇒ Object

Append a chunk of bytes and yield each fully-formed event.



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/feat/sse.rb', line 33

def feed(chunk)
  @buffer << chunk
  while (idx = @buffer.index("\n"))
    line = @buffer.slice!(0, idx + 1)
    # String#chomp strips a trailing "\r\n", "\n", or "\r".
    process_line(line.chomp) { |event| yield event }
  end
  # A line that never terminates, or a single oversized data field, must
  # not grow the buffers without bound. Abort once past the cap.
  guard_size!
end