Class: ConduitSSE::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/conduit_sse/stream.rb

Overview

Core streaming parser for Server-Sent Events (SSE).

See Config for the full list of configuration knobs and the two equivalent constructor forms (kwargs and block).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) {|config| ... } ⇒ Stream

Returns a new instance of Stream.

Yield Parameters:

  • config (ConduitSSE::Config)

    Mutable configuration object; any values set in the block win over the kwargs.



25
26
27
28
29
30
# File 'lib/conduit_sse/stream.rb', line 25

def initialize(**opts, &block)
  @config = Config.new(**opts)
  block&.call(@config)
  @config.finalize!
  @state = State.new(stats_enabled: @config.stats)
end

Instance Attribute Details

#configConduitSSE::Config (readonly)

The frozen, validated configuration for this stream.

Returns:



17
18
19
# File 'lib/conduit_sse/stream.rb', line 17

def config
  @config
end

#stateConduitSSE::State (readonly)

The runtime mutable state (buffer, last_event_id, retry_ms, counters).

Returns:



21
22
23
# File 'lib/conduit_sse/stream.rb', line 21

def state
  @state
end

Instance Method Details

#<<(chunk) ⇒ self

Feed a chunk of data to the stream for processing.

Chunks are typically received from an HTTP stream (e.g., Net::HTTP response body). The chunk is normalized, buffered, and then processed for complete frames. Returns self for method chaining.

Parameters:

  • chunk (String)

    Raw data chunk from the stream

Returns:

  • (self)


190
191
192
193
194
195
196
197
198
199
# File 'lib/conduit_sse/stream.rb', line 190

def <<(chunk)
  chunk = normalize_chunk(chunk)

  @state.callbacks.emit(:chunk, chunk)
  @state.buffer << chunk
  @state.increment_stat(:chunk)

  process_frames
  self
end

#buffer_sizeInteger

Current buffer size in bytes.

Returns the size of the internal buffer, useful for monitoring memory usage during long-running streams.

Returns:

  • (Integer)

    Buffer size in bytes



44
45
46
# File 'lib/conduit_sse/stream.rb', line 44

def buffer_size
  @state.buffer_size
end

#each {|parsed| ... } ⇒ Enumerator, self

Enumerable interface for iterating over parsed events.

Provides a convenient way to iterate over the results of your parser. Without a block, returns an Enumerator. With a block, registers an on_parsed callback and returns self for chaining.

Yields:

  • (parsed)

    The result of your parser

Returns:

  • (Enumerator, self)


227
228
229
230
231
232
# File 'lib/conduit_sse/stream.rb', line 227

def each(&block)
  return enum_for(:each) unless block

  on_parsed(&block)
  self
end

#finishself Also known as: close

Signal end-of-input. Processes any bytes left in the buffer as a final frame, so trailing data not terminated by the frame separator still produces an event.

Call this when the underlying transport closes cleanly without a trailing “nn” (typical for many HTTP SSE servers). Safe to call multiple times; safe to call on an empty buffer; safe to keep using the stream afterwards.

Returns:

  • (self)


209
210
211
212
213
214
215
216
# File 'lib/conduit_sse/stream.rb', line 209

def finish
  buffer = @state.buffer
  return self if buffer.empty?

  remainder = buffer.slice!(0, buffer.length)
  process_frame(remainder)
  self
end

#last_event_idString?

Returns The last ‘id:` value seen on the wire.

Returns:

  • (String, nil)

    The last ‘id:` value seen on the wire.



33
# File 'lib/conduit_sse/stream.rb', line 33

def last_event_id = @state.last_event_id

#on_chunk {|chunk| ... } ⇒ Object

Raw chunk as it arrived (after normalization).

The chunk is a string that has been normalized (UTF-8 encoded, CRLF→LF). This is called for every chunk fed to the stream via <<, regardless of whether the chunk contains complete frames or partial data.

Yields:

  • (chunk)

    The normalized chunk string



71
72
73
# File 'lib/conduit_sse/stream.rb', line 71

def on_chunk(&block)
  @state.callbacks.on(:chunk, &block)
end

#on_error {|error| ... } ⇒ Object

Errors raised by any callback or by the parser.

When a callback (other than on_error itself) or the parser raises an error, it’s routed to this handler if registered. This prevents errors from interrupting the stream processing.

If on_error is not registered, errors will bubble up and interrupt processing. If on_error itself raises, that error will bubble up.

Yields:

  • (error)

    The exception that was raised



174
175
176
177
178
179
180
# File 'lib/conduit_sse/stream.rb', line 174

def on_error(&block)
  wrapped_block = proc do |error|
    @state.increment_stat(:error)
    block.call(error)
  end
  @state.callbacks.on(:error, &wrapped_block)
end

#on_event(type: nil) {|event| ... } ⇒ Object

Fully parsed SSE event as a Event.

This callback receives a ConduitSSE::Event object with the following attributes:

  • event: Event type (defaults to “message”)

  • data: Joined data field content (data lines joined by “n”)

  • id: Last event ID from the SSE spec

  • retry: Retry delay in milliseconds from the SSE spec

Only called for frames that contain at least one data field. Use this callback when you need access to SSE metadata (event type, id, retry).

Parameters:

  • type (String, Array<String>, nil) (defaults to: nil)

    Optional event type(s) to filter by. If provided, the callback only triggers for matching event types.

Yields:

  • (event)

    A ConduitSSE::Event object



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/conduit_sse/stream.rb', line 101

def on_event(type: nil, &block)
  if type
    wrapped_block = proc do |event|
      filter_match = Array(type).include?(event.event)
      next unless filter_match

      block.call(event)
    end
    @state.callbacks.on(:event, &wrapped_block)
  else
    @state.callbacks.on(:event, &block)
  end
end

#on_field {|name, value| ... } ⇒ Object

Every parsed SSE field line. Yields (name, value) for every field, including the standard ones (data/event/id/retry) and any custom fields a server emits.

Per the SSE spec, fields are parsed one per line with the format “name: value”. This callback is invoked for each field line as it’s parsed from the frame.

Yields:

  • (name, value)

    The field name and value as strings



160
161
162
# File 'lib/conduit_sse/stream.rb', line 160

def on_field(&block)
  @state.callbacks.on(:field, &block)
end

#on_frame {|frame| ... } ⇒ Object

Complete frame text (after sanitization), regardless of whether it produces an event.

A frame is the text between frame separators (default: “nn”). This callback receives the raw frame string after sanitization (default: strip). This includes frames that may not produce events (e.g., frames without data fields). Ping frames are handled separately by on_ping and do not trigger this callback.

Yields:

  • (frame)

    The sanitized frame string



83
84
85
# File 'lib/conduit_sse/stream.rb', line 83

def on_frame(&block)
  @state.callbacks.on(:frame, &block)
end

#on_parsed(type: nil) {|parsed| ... } ⇒ Object

Result of running the configured parser over an event’s data.

The parser receives ONLY the data field content (joined by “n”), not the entire frame. If you need access to other SSE fields (event type, id, retry), use on_event instead.

If the parser raises an error and an on_error handler is registered, the error is routed to on_error and this callback is NOT invoked for that event.

Parameters:

  • type (String, Array<String>, nil) (defaults to: nil)

    Optional event type(s) to filter by. If provided, the callback only triggers for matching event types.

Yields:

  • (parsed)

    Whatever your parser lambda returns



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/conduit_sse/stream.rb', line 126

def on_parsed(type: nil, &block)
  if type
    wrapped_block = proc do |parsed|
      # We need access to the event type here, but on_parsed receives parsed data
      # We need to track the last event type to filter properly
      filter_match = Array(type).include?(@state.last_event_type || "message")
      next unless filter_match

      block.call(parsed)
    end
    @state.callbacks.on(:parsed, &wrapped_block)
  else
    @state.callbacks.on(:parsed, &block)
  end
end

#on_ping {|frame| ... } ⇒ Object

Ping/comment frame.

Ping frames are identified by the ping_pattern (default: “:”). These are typically used for keep-alive messages or comments. Ping frames do NOT trigger on_frame or on_event callbacks.

Yields:

  • (frame)

    The ping frame string



149
150
151
# File 'lib/conduit_sse/stream.rb', line 149

def on_ping(&block)
  @state.callbacks.on(:ping, &block)
end

#retry_msInteger, ...

Returns The last ‘retry:` value seen on the wire.

Returns:

  • (Integer, String, nil)

    The last ‘retry:` value seen on the wire.



36
# File 'lib/conduit_sse/stream.rb', line 36

def retry_ms = @state.retry_ms

#statsHash?

Stream statistics.

Returns a hash with counts of all processed items, useful for monitoring and debugging without the overhead of the Inspector’s logging.

Stats are opt-in: pass ‘stats: true` to #initialize to enable tracking. When stats are disabled (the default), this method returns nil and the parser does zero stats bookkeeping per event.

Returns:

  • (Hash, nil)

    Statistics hash with keys: :chunk, :frame, :event, :parsed, :ping, :field, :error, :avg_fields_per_frame; or nil when stats tracking is disabled.



60
61
62
# File 'lib/conduit_sse/stream.rb', line 60

def stats
  @state.stats_snapshot
end