Class: ConduitSSE::Stream
- Inherits:
-
Object
- Object
- ConduitSSE::Stream
- Defined in:
- lib/conduit_sse/stream.rb
Overview
Core streaming parser for Server-Sent Events (SSE).
See Config for the full list of configuration knobs and the two equivalent constructor forms (kwargs and block).
Instance Attribute Summary collapse
-
#config ⇒ ConduitSSE::Config
readonly
The frozen, validated configuration for this stream.
-
#state ⇒ ConduitSSE::State
readonly
The runtime mutable state (buffer, last_event_id, retry_ms, counters).
Instance Method Summary collapse
-
#<<(chunk) ⇒ self
Feed a chunk of data to the stream for processing.
-
#buffer_size ⇒ Integer
Current buffer size in bytes.
-
#each {|parsed| ... } ⇒ Enumerator, self
Enumerable interface for iterating over parsed events.
-
#finish ⇒ self
(also: #close)
Signal end-of-input.
-
#initialize(**opts) {|config| ... } ⇒ Stream
constructor
A new instance of Stream.
-
#last_event_id ⇒ String?
The last ‘id:` value seen on the wire.
-
#on_chunk {|chunk| ... } ⇒ Object
Raw chunk as it arrived (after normalization).
-
#on_error {|error| ... } ⇒ Object
Errors raised by any callback or by the parser.
-
#on_event(type: nil) {|event| ... } ⇒ Object
Fully parsed SSE event as a Event.
-
#on_field {|name, value| ... } ⇒ Object
Every parsed SSE field line.
-
#on_frame {|frame| ... } ⇒ Object
Complete frame text (after sanitization), regardless of whether it produces an event.
-
#on_parsed(type: nil) {|parsed| ... } ⇒ Object
Result of running the configured parser over an event’s data.
-
#on_ping {|frame| ... } ⇒ Object
Ping/comment frame.
-
#retry_ms ⇒ Integer, ...
The last ‘retry:` value seen on the wire.
-
#stats ⇒ Hash?
Stream statistics.
Constructor Details
Instance Attribute Details
#config ⇒ ConduitSSE::Config (readonly)
The frozen, validated configuration for this stream.
17 18 19 |
# File 'lib/conduit_sse/stream.rb', line 17 def config @config end |
#state ⇒ ConduitSSE::State (readonly)
The runtime mutable state (buffer, last_event_id, retry_ms, counters).
21 22 23 |
# File 'lib/conduit_sse/stream.rb', line 21 def state @state end |
Instance Method Details
#<<(chunk) ⇒ self
Feed a chunk of data to the stream for processing.
Chunks are typically received from an HTTP stream (e.g., Net::HTTP response body). The chunk is normalized, buffered, and then processed for complete frames. Returns self for method chaining.
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/conduit_sse/stream.rb', line 190 def <<(chunk) chunk = normalize_chunk(chunk) @state.callbacks.emit(:chunk, chunk) @state.buffer << chunk @state.increment_stat(:chunk) process_frames self end |
#buffer_size ⇒ Integer
Current buffer size in bytes.
Returns the size of the internal buffer, useful for monitoring memory usage during long-running streams.
44 45 46 |
# File 'lib/conduit_sse/stream.rb', line 44 def buffer_size @state.buffer_size end |
#each {|parsed| ... } ⇒ Enumerator, self
Enumerable interface for iterating over parsed events.
Provides a convenient way to iterate over the results of your parser. Without a block, returns an Enumerator. With a block, registers an on_parsed callback and returns self for chaining.
227 228 229 230 231 232 |
# File 'lib/conduit_sse/stream.rb', line 227 def each(&block) return enum_for(:each) unless block on_parsed(&block) self end |
#finish ⇒ self Also known as: close
Signal end-of-input. Processes any bytes left in the buffer as a final frame, so trailing data not terminated by the frame separator still produces an event.
Call this when the underlying transport closes cleanly without a trailing “nn” (typical for many HTTP SSE servers). Safe to call multiple times; safe to call on an empty buffer; safe to keep using the stream afterwards.
209 210 211 212 213 214 215 216 |
# File 'lib/conduit_sse/stream.rb', line 209 def finish buffer = @state.buffer return self if buffer.empty? remainder = buffer.slice!(0, buffer.length) process_frame(remainder) self end |
#last_event_id ⇒ String?
Returns The last ‘id:` value seen on the wire.
33 |
# File 'lib/conduit_sse/stream.rb', line 33 def last_event_id = @state.last_event_id |
#on_chunk {|chunk| ... } ⇒ Object
Raw chunk as it arrived (after normalization).
The chunk is a string that has been normalized (UTF-8 encoded, CRLF→LF). This is called for every chunk fed to the stream via <<, regardless of whether the chunk contains complete frames or partial data.
71 72 73 |
# File 'lib/conduit_sse/stream.rb', line 71 def on_chunk(&block) @state.callbacks.on(:chunk, &block) end |
#on_error {|error| ... } ⇒ Object
Errors raised by any callback or by the parser.
When a callback (other than on_error itself) or the parser raises an error, it’s routed to this handler if registered. This prevents errors from interrupting the stream processing.
If on_error is not registered, errors will bubble up and interrupt processing. If on_error itself raises, that error will bubble up.
174 175 176 177 178 179 180 |
# File 'lib/conduit_sse/stream.rb', line 174 def on_error(&block) wrapped_block = proc do |error| @state.increment_stat(:error) block.call(error) end @state.callbacks.on(:error, &wrapped_block) end |
#on_event(type: nil) {|event| ... } ⇒ Object
Fully parsed SSE event as a Event.
This callback receives a ConduitSSE::Event object with the following attributes:
-
event: Event type (defaults to “message”)
-
data: Joined data field content (data lines joined by “n”)
-
id: Last event ID from the SSE spec
-
retry: Retry delay in milliseconds from the SSE spec
Only called for frames that contain at least one data field. Use this callback when you need access to SSE metadata (event type, id, retry).
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/conduit_sse/stream.rb', line 101 def on_event(type: nil, &block) if type wrapped_block = proc do |event| filter_match = Array(type).include?(event.event) next unless filter_match block.call(event) end @state.callbacks.on(:event, &wrapped_block) else @state.callbacks.on(:event, &block) end end |
#on_field {|name, value| ... } ⇒ Object
Every parsed SSE field line. Yields (name, value) for every field, including the standard ones (data/event/id/retry) and any custom fields a server emits.
Per the SSE spec, fields are parsed one per line with the format “name: value”. This callback is invoked for each field line as it’s parsed from the frame.
160 161 162 |
# File 'lib/conduit_sse/stream.rb', line 160 def on_field(&block) @state.callbacks.on(:field, &block) end |
#on_frame {|frame| ... } ⇒ Object
Complete frame text (after sanitization), regardless of whether it produces an event.
A frame is the text between frame separators (default: “nn”). This callback receives the raw frame string after sanitization (default: strip). This includes frames that may not produce events (e.g., frames without data fields). Ping frames are handled separately by on_ping and do not trigger this callback.
83 84 85 |
# File 'lib/conduit_sse/stream.rb', line 83 def on_frame(&block) @state.callbacks.on(:frame, &block) end |
#on_parsed(type: nil) {|parsed| ... } ⇒ Object
Result of running the configured parser over an event’s data.
The parser receives ONLY the data field content (joined by “n”), not the entire frame. If you need access to other SSE fields (event type, id, retry), use on_event instead.
If the parser raises an error and an on_error handler is registered, the error is routed to on_error and this callback is NOT invoked for that event.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/conduit_sse/stream.rb', line 126 def on_parsed(type: nil, &block) if type wrapped_block = proc do |parsed| # We need access to the event type here, but on_parsed receives parsed data # We need to track the last event type to filter properly filter_match = Array(type).include?(@state.last_event_type || "message") next unless filter_match block.call(parsed) end @state.callbacks.on(:parsed, &wrapped_block) else @state.callbacks.on(:parsed, &block) end end |
#on_ping {|frame| ... } ⇒ Object
Ping/comment frame.
Ping frames are identified by the ping_pattern (default: “:”). These are typically used for keep-alive messages or comments. Ping frames do NOT trigger on_frame or on_event callbacks.
149 150 151 |
# File 'lib/conduit_sse/stream.rb', line 149 def on_ping(&block) @state.callbacks.on(:ping, &block) end |
#retry_ms ⇒ Integer, ...
Returns The last ‘retry:` value seen on the wire.
36 |
# File 'lib/conduit_sse/stream.rb', line 36 def retry_ms = @state.retry_ms |
#stats ⇒ Hash?
Stream statistics.
Returns a hash with counts of all processed items, useful for monitoring and debugging without the overhead of the Inspector’s logging.
Stats are opt-in: pass ‘stats: true` to #initialize to enable tracking. When stats are disabled (the default), this method returns nil and the parser does zero stats bookkeeping per event.
60 61 62 |
# File 'lib/conduit_sse/stream.rb', line 60 def stats @state.stats_snapshot end |