Class: Conduit::Stream
- Inherits:
-
Object
- Object
- Conduit::Stream
- Defined in:
- lib/conduit/stream.rb
Overview
Core streaming parser for Server-Sent Events (SSE).
Instance Attribute Summary collapse
-
#last_event_id ⇒ Object
readonly
Stream state — last id/retry seen, per SSE spec semantics.
-
#retry_ms ⇒ Object
readonly
Stream state — last id/retry seen, per SSE spec semantics.
Instance Method Summary collapse
-
#<<(chunk) ⇒ self
Feed a chunk of data to the stream for processing.
-
#each {|parsed| ... } ⇒ Enumerator, self
Enumerable interface for iterating over parsed events.
-
#finish ⇒ self
(also: #close)
Signal end-of-input.
-
#initialize(parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil) ⇒ Stream
constructor
Initialize a new Stream with optional customizations.
-
#on_chunk {|chunk| ... } ⇒ Object
Raw chunk as it arrived (after normalization).
-
#on_error {|error| ... } ⇒ Object
Errors raised by any callback or by the parser.
-
#on_event {|event| ... } ⇒ Object
Fully parsed SSE event as a Event.
-
#on_field {|name, value| ... } ⇒ Object
Every parsed SSE field line.
-
#on_frame {|frame| ... } ⇒ Object
Complete frame text (after sanitization), regardless of whether it produces an event.
-
#on_parsed {|parsed| ... } ⇒ Object
Result of running the configured parser over an event’s data.
-
#on_ping {|frame| ... } ⇒ Object
Ping/comment frame.
Constructor Details
#initialize(parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil) ⇒ Stream
Initialize a new Stream with optional customizations.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/conduit/stream.rb', line 18 def initialize( parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil ) raise ArgumentError, "parser must be a Proc (respond to #call)" unless parser.respond_to?(:call) @parser = parser @chunk_normalizer = chunk_normalizer || Defaults::CHUNK_NORMALIZER @sanitize_pattern = sanitize_pattern || Defaults::SANITIZE_PATTERN @frame_separator = frame_separator || Defaults::FRAME_SEPARATOR @payload_start = payload_start || Defaults::PAYLOAD_START @ping_pattern = ping_pattern || Defaults::PING_PATTERN @data_field = @payload_start.chomp(":") @buffer = +"" @callbacks = Callbacks.new @last_event_id = nil @retry_ms = nil end |
Instance Attribute Details
#last_event_id ⇒ Object (readonly)
Stream state — last id/retry seen, per SSE spec semantics.
42 43 44 |
# File 'lib/conduit/stream.rb', line 42 def last_event_id @last_event_id end |
#retry_ms ⇒ Object (readonly)
Stream state — last id/retry seen, per SSE spec semantics.
42 43 44 |
# File 'lib/conduit/stream.rb', line 42 def retry_ms @retry_ms end |
Instance Method Details
#<<(chunk) ⇒ self
Feed a chunk of data to the stream for processing.
Chunks are typically received from an HTTP stream (e.g., Net::HTTP response body). The chunk is normalized, buffered, and then processed for complete frames. Returns self for method chaining.
140 141 142 143 144 145 146 147 148 |
# File 'lib/conduit/stream.rb', line 140 def <<(chunk) chunk = normalize_chunk(chunk) @callbacks.emit(:chunk, chunk) @buffer << chunk process_frames self end |
#each {|parsed| ... } ⇒ Enumerator, self
Enumerable interface for iterating over parsed events.
Provides a convenient way to iterate over the results of your parser. Without a block, returns an Enumerator. With a block, registers an on_parsed callback and returns self for chaining.
175 176 177 178 179 180 |
# File 'lib/conduit/stream.rb', line 175 def each(&block) return enum_for(:each) unless block on_parsed(&block) self end |
#finish ⇒ self Also known as: close
Signal end-of-input. Processes any bytes left in the buffer as a final frame, so trailing data not terminated by the frame separator still produces an event.
Call this when the underlying transport closes cleanly without a trailing “nn” (typical for many HTTP SSE servers). Safe to call multiple times; safe to call on an empty buffer; safe to keep using the stream afterwards.
158 159 160 161 162 163 164 |
# File 'lib/conduit/stream.rb', line 158 def finish return self if @buffer.empty? remainder = @buffer.slice!(0, @buffer.length) process_frame(remainder) self end |
#on_chunk {|chunk| ... } ⇒ Object
Raw chunk as it arrived (after normalization).
The chunk is a string that has been normalized (UTF-8 encoded, CRLF→LF). This is called for every chunk fed to the stream via <<, regardless of whether the chunk contains complete frames or partial data.
51 52 53 |
# File 'lib/conduit/stream.rb', line 51 def on_chunk(&block) @callbacks.on(:chunk, &block) end |
#on_error {|error| ... } ⇒ Object
Errors raised by any callback or by the parser.
When a callback (other than on_error itself) or the parser raises an error, it’s routed to this handler if registered. This prevents errors from interrupting the stream processing.
If on_error is not registered, errors will bubble up and interrupt processing. If on_error itself raises, that error will bubble up.
128 129 130 |
# File 'lib/conduit/stream.rb', line 128 def on_error(&block) @callbacks.on(:error, &block) end |
#on_event {|event| ... } ⇒ Object
Fully parsed SSE event as a Event.
This callback receives a Conduit::Event object with the following attributes:
-
event: Event type (defaults to “message”)
-
data: Joined data field content (data lines joined by “n”)
-
id: Last event ID from the SSE spec
-
retry: Retry delay in milliseconds from the SSE spec
Only called for frames that contain at least one data field. Use this callback when you need access to SSE metadata (event type, id, retry).
79 80 81 |
# File 'lib/conduit/stream.rb', line 79 def on_event(&block) @callbacks.on(:event, &block) end |
#on_field {|name, value| ... } ⇒ Object
Every parsed SSE field line. Yields (name, value) for every field, including the standard ones (data/event/id/retry) and any custom fields a server emits.
Per the SSE spec, fields are parsed one per line with the format “name: value”. This callback is invoked for each field line as it’s parsed from the frame.
114 115 116 |
# File 'lib/conduit/stream.rb', line 114 def on_field(&block) @callbacks.on(:field, &block) end |
#on_frame {|frame| ... } ⇒ Object
Complete frame text (after sanitization), regardless of whether it produces an event.
A frame is the text between frame separators (default: “nn”). This callback receives the raw frame string after sanitization (default: strip). This includes frames that may not produce events (e.g., frames without data fields). Ping frames are handled separately by on_ping and do not trigger this callback.
63 64 65 |
# File 'lib/conduit/stream.rb', line 63 def on_frame(&block) @callbacks.on(:frame, &block) end |
#on_parsed {|parsed| ... } ⇒ Object
Result of running the configured parser over an event’s data.
The parser receives ONLY the data field content (joined by “n”), not the entire frame. If you need access to other SSE fields (event type, id, retry), use on_event instead.
If the parser raises an error and an on_error handler is registered, the error is routed to on_error and this callback is NOT invoked for that event.
92 93 94 |
# File 'lib/conduit/stream.rb', line 92 def on_parsed(&block) @callbacks.on(:parsed, &block) end |
#on_ping {|frame| ... } ⇒ Object
Ping/comment frame.
Ping frames are identified by the ping_pattern (default: “:”). These are typically used for keep-alive messages or comments. Ping frames do NOT trigger on_frame or on_event callbacks.
103 104 105 |
# File 'lib/conduit/stream.rb', line 103 def on_ping(&block) @callbacks.on(:ping, &block) end |