Class: Conduit::Stream
- Inherits:
-
Object
- Object
- Conduit::Stream
- Defined in:
- lib/conduit/stream.rb
Overview
Core streaming parser for Server-Sent Events (SSE).
Instance Attribute Summary collapse
-
#last_event_id ⇒ Object
readonly
Stream state — last id/retry seen, per SSE spec semantics.
-
#retry_ms ⇒ Object
readonly
Stream state — last id/retry seen, per SSE spec semantics.
Instance Method Summary collapse
-
#<<(chunk) ⇒ self
Feed a chunk of data to the stream for processing.
-
#buffer_size ⇒ Integer
Current buffer size in bytes.
-
#each {|parsed| ... } ⇒ Enumerator, self
Enumerable interface for iterating over parsed events.
-
#finish ⇒ self
(also: #close)
Signal end-of-input.
-
#initialize(parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil) ⇒ Stream
constructor
Initialize a new Stream with optional customizations.
-
#on_chunk {|chunk| ... } ⇒ Object
Raw chunk as it arrived (after normalization).
-
#on_error {|error| ... } ⇒ Object
Errors raised by any callback or by the parser.
-
#on_event(type: nil) {|event| ... } ⇒ Object
Fully parsed SSE event as a Event.
-
#on_field {|name, value| ... } ⇒ Object
Every parsed SSE field line.
-
#on_frame {|frame| ... } ⇒ Object
Complete frame text (after sanitization), regardless of whether it produces an event.
-
#on_parsed(type: nil) {|parsed| ... } ⇒ Object
Result of running the configured parser over an event’s data.
-
#on_ping {|frame| ... } ⇒ Object
Ping/comment frame.
-
#stats ⇒ Hash
Stream statistics.
Constructor Details
#initialize(parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil) ⇒ Stream
Initialize a new Stream with optional customizations.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/conduit/stream.rb', line 18 def initialize( parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil ) raise ArgumentError, "parser must be a Proc (respond to #call)" unless parser.respond_to?(:call) @parser = parser @chunk_normalizer = chunk_normalizer || Defaults::CHUNK_NORMALIZER @sanitize_pattern = sanitize_pattern || Defaults::SANITIZE_PATTERN @frame_separator = frame_separator || Defaults::FRAME_SEPARATOR @payload_start = payload_start || Defaults::PAYLOAD_START @ping_pattern = ping_pattern || Defaults::PING_PATTERN @data_field = @payload_start.chomp(":") @buffer = +"" @callbacks = Callbacks.new @last_event_id = nil @retry_ms = nil @last_event_type = nil @stats = Hash.new(0) @total_fields = 0 end |
Instance Attribute Details
#last_event_id ⇒ Object (readonly)
Stream state — last id/retry seen, per SSE spec semantics.
45 46 47 |
# File 'lib/conduit/stream.rb', line 45 def last_event_id @last_event_id end |
#retry_ms ⇒ Object (readonly)
Stream state — last id/retry seen, per SSE spec semantics.
45 46 47 |
# File 'lib/conduit/stream.rb', line 45 def retry_ms @retry_ms end |
Instance Method Details
#<<(chunk) ⇒ self
Feed a chunk of data to the stream for processing.
Chunks are typically received from an HTTP stream (e.g., Net::HTTP response body). The chunk is normalized, buffered, and then processed for complete frames. Returns self for method chaining.
196 197 198 199 200 201 202 203 204 205 |
# File 'lib/conduit/stream.rb', line 196 def <<(chunk) chunk = normalize_chunk(chunk) @callbacks.emit(:chunk, chunk) @buffer << chunk @stats[:chunk] += 1 process_frames self end |
#buffer_size ⇒ Integer
Current buffer size in bytes.
Returns the size of the internal buffer, useful for monitoring memory usage during long-running streams.
53 54 55 |
# File 'lib/conduit/stream.rb', line 53 def buffer_size @buffer.bytesize end |
#each {|parsed| ... } ⇒ Enumerator, self
Enumerable interface for iterating over parsed events.
Provides a convenient way to iterate over the results of your parser. Without a block, returns an Enumerator. With a block, registers an on_parsed callback and returns self for chaining.
232 233 234 235 236 237 |
# File 'lib/conduit/stream.rb', line 232 def each(&block) return enum_for(:each) unless block on_parsed(&block) self end |
#finish ⇒ self Also known as: close
Signal end-of-input. Processes any bytes left in the buffer as a final frame, so trailing data not terminated by the frame separator still produces an event.
Call this when the underlying transport closes cleanly without a trailing “nn” (typical for many HTTP SSE servers). Safe to call multiple times; safe to call on an empty buffer; safe to keep using the stream afterwards.
215 216 217 218 219 220 221 |
# File 'lib/conduit/stream.rb', line 215 def finish return self if @buffer.empty? remainder = @buffer.slice!(0, @buffer.length) process_frame(remainder) self end |
#on_chunk {|chunk| ... } ⇒ Object
Raw chunk as it arrived (after normalization).
The chunk is a string that has been normalized (UTF-8 encoded, CRLF→LF). This is called for every chunk fed to the stream via <<, regardless of whether the chunk contains complete frames or partial data.
77 78 79 |
# File 'lib/conduit/stream.rb', line 77 def on_chunk(&block) @callbacks.on(:chunk, &block) end |
#on_error {|error| ... } ⇒ Object
Errors raised by any callback or by the parser.
When a callback (other than on_error itself) or the parser raises an error, it’s routed to this handler if registered. This prevents errors from interrupting the stream processing.
If on_error is not registered, errors will bubble up and interrupt processing. If on_error itself raises, that error will bubble up.
180 181 182 183 184 185 186 |
# File 'lib/conduit/stream.rb', line 180 def on_error(&block) wrapped_block = proc do |error| @stats[:error] += 1 block.call(error) end @callbacks.on(:error, &wrapped_block) end |
#on_event(type: nil) {|event| ... } ⇒ Object
Fully parsed SSE event as a Event.
This callback receives a Conduit::Event object with the following attributes:
-
event: Event type (defaults to “message”)
-
data: Joined data field content (data lines joined by “n”)
-
id: Last event ID from the SSE spec
-
retry: Retry delay in milliseconds from the SSE spec
Only called for frames that contain at least one data field. Use this callback when you need access to SSE metadata (event type, id, retry).
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/conduit/stream.rb', line 107 def on_event(type: nil, &block) if type wrapped_block = proc do |event| filter_match = Array(type).include?(event.event) next unless filter_match block.call(event) end @callbacks.on(:event, &wrapped_block) else @callbacks.on(:event, &block) end end |
#on_field {|name, value| ... } ⇒ Object
Every parsed SSE field line. Yields (name, value) for every field, including the standard ones (data/event/id/retry) and any custom fields a server emits.
Per the SSE spec, fields are parsed one per line with the format “name: value”. This callback is invoked for each field line as it’s parsed from the frame.
166 167 168 |
# File 'lib/conduit/stream.rb', line 166 def on_field(&block) @callbacks.on(:field, &block) end |
#on_frame {|frame| ... } ⇒ Object
Complete frame text (after sanitization), regardless of whether it produces an event.
A frame is the text between frame separators (default: “nn”). This callback receives the raw frame string after sanitization (default: strip). This includes frames that may not produce events (e.g., frames without data fields). Ping frames are handled separately by on_ping and do not trigger this callback.
89 90 91 |
# File 'lib/conduit/stream.rb', line 89 def on_frame(&block) @callbacks.on(:frame, &block) end |
#on_parsed(type: nil) {|parsed| ... } ⇒ Object
Result of running the configured parser over an event’s data.
The parser receives ONLY the data field content (joined by “n”), not the entire frame. If you need access to other SSE fields (event type, id, retry), use on_event instead.
If the parser raises an error and an on_error handler is registered, the error is routed to on_error and this callback is NOT invoked for that event.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/conduit/stream.rb', line 132 def on_parsed(type: nil, &block) if type wrapped_block = proc do |parsed| # We need access to the event type here, but on_parsed receives parsed data # We need to track the last event type to filter properly filter_match = Array(type).include?(@last_event_type || "message") next unless filter_match block.call(parsed) end @callbacks.on(:parsed, &wrapped_block) else @callbacks.on(:parsed, &block) end end |
#on_ping {|frame| ... } ⇒ Object
Ping/comment frame.
Ping frames are identified by the ping_pattern (default: “:”). These are typically used for keep-alive messages or comments. Ping frames do NOT trigger on_frame or on_event callbacks.
155 156 157 |
# File 'lib/conduit/stream.rb', line 155 def on_ping(&block) @callbacks.on(:ping, &block) end |
#stats ⇒ Hash
Stream statistics.
Returns a hash with counts of all processed items, useful for monitoring and debugging without the overhead of the Inspector’s logging.
63 64 65 66 67 68 |
# File 'lib/conduit/stream.rb', line 63 def stats stats = @stats.dup avg_fields = @stats[:frame].positive? ? (@total_fields.to_f / @stats[:frame]).round(2) : 0 stats[:avg_fields_per_frame] = avg_fields stats end |