Class: Conduit::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/conduit/stream.rb

Overview

Core streaming parser for Server-Sent Events (SSE).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil) ⇒ Stream

Initialize a new Stream with optional customizations.

Parameters:

  • parser (Proc)

    Required. Callable that receives the joined data string of an SSE event and returns whatever shape the application wants.

  • chunk_normalizer (Proc) (defaults to: nil)

    Optional. Transforms incoming chunks before processing.

  • frame_separator (String) (defaults to: nil)

    Optional. Delimiter that separates frames in the stream.

  • payload_start (String) (defaults to: nil)

    Optional. Prefix used to identify the data field (the trailing “:” is stripped to derive the field name).

  • ping_pattern (String) (defaults to: nil)

    Optional. Pattern identifying ping frames.

  • sanitize_pattern (Proc) (defaults to: nil)

    Optional. Cleans or validates frame content.

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/conduit/stream.rb', line 18

def initialize(
  parser:,
  chunk_normalizer: nil,
  frame_separator: nil,
  payload_start: nil,
  ping_pattern: nil,
  sanitize_pattern: nil
)
  raise ArgumentError, "parser must be a Proc (respond to #call)" unless parser.respond_to?(:call)

  @parser           = parser
  @chunk_normalizer = chunk_normalizer || Defaults::CHUNK_NORMALIZER
  @sanitize_pattern = sanitize_pattern || Defaults::SANITIZE_PATTERN
  @frame_separator  = frame_separator  || Defaults::FRAME_SEPARATOR
  @payload_start    = payload_start    || Defaults::PAYLOAD_START
  @ping_pattern     = ping_pattern     || Defaults::PING_PATTERN
  @data_field       = @payload_start.chomp(":")
  @buffer           = +""
  @callbacks        = Callbacks.new
  @last_event_id    = nil
  @retry_ms         = nil
  @last_event_type  = nil
  @stats            = Hash.new(0)
  @total_fields     = 0
end

Instance Attribute Details

#last_event_idObject (readonly)

Stream state — last id/retry seen, per SSE spec semantics.



45
46
47
# File 'lib/conduit/stream.rb', line 45

def last_event_id
  @last_event_id
end

#retry_msObject (readonly)

Stream state — last id/retry seen, per SSE spec semantics.



45
46
47
# File 'lib/conduit/stream.rb', line 45

def retry_ms
  @retry_ms
end

Instance Method Details

#<<(chunk) ⇒ self

Feed a chunk of data to the stream for processing.

Chunks are typically received from an HTTP stream (e.g., Net::HTTP response body). The chunk is normalized, buffered, and then processed for complete frames. Returns self for method chaining.

Parameters:

  • chunk (String)

    Raw data chunk from the stream

Returns:

  • (self)


196
197
198
199
200
201
202
203
204
205
# File 'lib/conduit/stream.rb', line 196

def <<(chunk)
  chunk = normalize_chunk(chunk)

  @callbacks.emit(:chunk, chunk)
  @buffer << chunk
  @stats[:chunk] += 1

  process_frames
  self
end

#buffer_sizeInteger

Current buffer size in bytes.

Returns the size of the internal buffer, useful for monitoring memory usage during long-running streams.

Returns:

  • (Integer)

    Buffer size in bytes



53
54
55
# File 'lib/conduit/stream.rb', line 53

def buffer_size
  @buffer.bytesize
end

#each {|parsed| ... } ⇒ Enumerator, self

Enumerable interface for iterating over parsed events.

Provides a convenient way to iterate over the results of your parser. Without a block, returns an Enumerator. With a block, registers an on_parsed callback and returns self for chaining.

Yields:

  • (parsed)

    The result of your parser

Returns:

  • (Enumerator, self)


232
233
234
235
236
237
# File 'lib/conduit/stream.rb', line 232

def each(&block)
  return enum_for(:each) unless block

  on_parsed(&block)
  self
end

#finishself Also known as: close

Signal end-of-input. Processes any bytes left in the buffer as a final frame, so trailing data not terminated by the frame separator still produces an event.

Call this when the underlying transport closes cleanly without a trailing “nn” (typical for many HTTP SSE servers). Safe to call multiple times; safe to call on an empty buffer; safe to keep using the stream afterwards.

Returns:

  • (self)


215
216
217
218
219
220
221
# File 'lib/conduit/stream.rb', line 215

def finish
  return self if @buffer.empty?

  remainder = @buffer.slice!(0, @buffer.length)
  process_frame(remainder)
  self
end

#on_chunk {|chunk| ... } ⇒ Object

Raw chunk as it arrived (after normalization).

The chunk is a string that has been normalized (UTF-8 encoded, CRLF→LF). This is called for every chunk fed to the stream via <<, regardless of whether the chunk contains complete frames or partial data.

Yields:

  • (chunk)

    The normalized chunk string



77
78
79
# File 'lib/conduit/stream.rb', line 77

def on_chunk(&block)
  @callbacks.on(:chunk, &block)
end

#on_error {|error| ... } ⇒ Object

Errors raised by any callback or by the parser.

When a callback (other than on_error itself) or the parser raises an error, it’s routed to this handler if registered. This prevents errors from interrupting the stream processing.

If on_error is not registered, errors will bubble up and interrupt processing. If on_error itself raises, that error will bubble up.

Yields:

  • (error)

    The exception that was raised



180
181
182
183
184
185
186
# File 'lib/conduit/stream.rb', line 180

def on_error(&block)
  wrapped_block = proc do |error|
    @stats[:error] += 1
    block.call(error)
  end
  @callbacks.on(:error, &wrapped_block)
end

#on_event(type: nil) {|event| ... } ⇒ Object

Fully parsed SSE event as a Event.

This callback receives a Conduit::Event object with the following attributes:

  • event: Event type (defaults to “message”)

  • data: Joined data field content (data lines joined by “n”)

  • id: Last event ID from the SSE spec

  • retry: Retry delay in milliseconds from the SSE spec

Only called for frames that contain at least one data field. Use this callback when you need access to SSE metadata (event type, id, retry).

Parameters:

  • type (String, Array<String>, nil) (defaults to: nil)

    Optional event type(s) to filter by. If provided, the callback only triggers for matching event types.

Yields:

  • (event)

    A Conduit::Event object



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/conduit/stream.rb', line 107

def on_event(type: nil, &block)
  if type
    wrapped_block = proc do |event|
      filter_match = Array(type).include?(event.event)
      next unless filter_match

      block.call(event)
    end
    @callbacks.on(:event, &wrapped_block)
  else
    @callbacks.on(:event, &block)
  end
end

#on_field {|name, value| ... } ⇒ Object

Every parsed SSE field line. Yields (name, value) for every field, including the standard ones (data/event/id/retry) and any custom fields a server emits.

Per the SSE spec, fields are parsed one per line with the format “name: value”. This callback is invoked for each field line as it’s parsed from the frame.

Yields:

  • (name, value)

    The field name and value as strings



166
167
168
# File 'lib/conduit/stream.rb', line 166

def on_field(&block)
  @callbacks.on(:field, &block)
end

#on_frame {|frame| ... } ⇒ Object

Complete frame text (after sanitization), regardless of whether it produces an event.

A frame is the text between frame separators (default: “nn”). This callback receives the raw frame string after sanitization (default: strip). This includes frames that may not produce events (e.g., frames without data fields). Ping frames are handled separately by on_ping and do not trigger this callback.

Yields:

  • (frame)

    The sanitized frame string



89
90
91
# File 'lib/conduit/stream.rb', line 89

def on_frame(&block)
  @callbacks.on(:frame, &block)
end

#on_parsed(type: nil) {|parsed| ... } ⇒ Object

Result of running the configured parser over an event’s data.

The parser receives ONLY the data field content (joined by “n”), not the entire frame. If you need access to other SSE fields (event type, id, retry), use on_event instead.

If the parser raises an error and an on_error handler is registered, the error is routed to on_error and this callback is NOT invoked for that event.

Parameters:

  • type (String, Array<String>, nil) (defaults to: nil)

    Optional event type(s) to filter by. If provided, the callback only triggers for matching event types.

Yields:

  • (parsed)

    Whatever your parser lambda returns



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/conduit/stream.rb', line 132

def on_parsed(type: nil, &block)
  if type
    wrapped_block = proc do |parsed|
      # We need access to the event type here, but on_parsed receives parsed data
      # We need to track the last event type to filter properly
      filter_match = Array(type).include?(@last_event_type || "message")
      next unless filter_match

      block.call(parsed)
    end
    @callbacks.on(:parsed, &wrapped_block)
  else
    @callbacks.on(:parsed, &block)
  end
end

#on_ping {|frame| ... } ⇒ Object

Ping/comment frame.

Ping frames are identified by the ping_pattern (default: “:”). These are typically used for keep-alive messages or comments. Ping frames do NOT trigger on_frame or on_event callbacks.

Yields:

  • (frame)

    The ping frame string



155
156
157
# File 'lib/conduit/stream.rb', line 155

def on_ping(&block)
  @callbacks.on(:ping, &block)
end

#statsHash

Stream statistics.

Returns a hash with counts of all processed items, useful for monitoring and debugging without the overhead of the Inspector’s logging.

Returns:

  • (Hash)

    Statistics hash with keys: :chunk, :frame, :event, :parsed, :ping, :field, :error, :avg_fields_per_frame



63
64
65
66
67
68
# File 'lib/conduit/stream.rb', line 63

def stats
  stats = @stats.dup
  avg_fields = @stats[:frame].positive? ? (@total_fields.to_f / @stats[:frame]).round(2) : 0
  stats[:avg_fields_per_frame] = avg_fields
  stats
end