Class: Conduit::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/conduit/stream.rb

Overview

Core streaming parser for Server-Sent Events (SSE).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parser:, chunk_normalizer: nil, frame_separator: nil, payload_start: nil, ping_pattern: nil, sanitize_pattern: nil) ⇒ Stream

Initialize a new Stream with optional customizations.

Parameters:

  • parser (Proc)

    Required. Callable that receives the joined data string of an SSE event and returns whatever shape the application wants.

  • chunk_normalizer (Proc) (defaults to: nil)

    Optional. Transforms incoming chunks before processing.

  • frame_separator (String) (defaults to: nil)

    Optional. Delimiter that separates frames in the stream.

  • payload_start (String) (defaults to: nil)

    Optional. Prefix used to identify the data field (the trailing “:” is stripped to derive the field name).

  • ping_pattern (String) (defaults to: nil)

    Optional. Pattern identifying ping frames.

  • sanitize_pattern (Proc) (defaults to: nil)

    Optional. Cleans or validates frame content.

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/conduit/stream.rb', line 18

def initialize(
  parser:,
  chunk_normalizer: nil,
  frame_separator: nil,
  payload_start: nil,
  ping_pattern: nil,
  sanitize_pattern: nil
)
  raise ArgumentError, "parser must be a Proc (respond to #call)" unless parser.respond_to?(:call)

  @parser           = parser
  @chunk_normalizer = chunk_normalizer || Defaults::CHUNK_NORMALIZER
  @sanitize_pattern = sanitize_pattern || Defaults::SANITIZE_PATTERN
  @frame_separator  = frame_separator  || Defaults::FRAME_SEPARATOR
  @payload_start    = payload_start    || Defaults::PAYLOAD_START
  @ping_pattern     = ping_pattern     || Defaults::PING_PATTERN
  @data_field       = @payload_start.chomp(":")
  @buffer           = +""
  @callbacks        = Callbacks.new
  @last_event_id    = nil
  @retry_ms         = nil
end

Instance Attribute Details

#last_event_idObject (readonly)

Stream state — last id/retry seen, per SSE spec semantics.



42
43
44
# File 'lib/conduit/stream.rb', line 42

def last_event_id
  @last_event_id
end

#retry_msObject (readonly)

Stream state — last id/retry seen, per SSE spec semantics.



42
43
44
# File 'lib/conduit/stream.rb', line 42

def retry_ms
  @retry_ms
end

Instance Method Details

#<<(chunk) ⇒ self

Feed a chunk of data to the stream for processing.

Chunks are typically received from an HTTP stream (e.g., Net::HTTP response body). The chunk is normalized, buffered, and then processed for complete frames. Returns self for method chaining.

Parameters:

  • chunk (String)

    Raw data chunk from the stream

Returns:

  • (self)


140
141
142
143
144
145
146
147
148
# File 'lib/conduit/stream.rb', line 140

def <<(chunk)
  chunk = normalize_chunk(chunk)

  @callbacks.emit(:chunk, chunk)
  @buffer << chunk

  process_frames
  self
end

#each {|parsed| ... } ⇒ Enumerator, self

Enumerable interface for iterating over parsed events.

Provides a convenient way to iterate over the results of your parser. Without a block, returns an Enumerator. With a block, registers an on_parsed callback and returns self for chaining.

Yields:

  • (parsed)

    The result of your parser

Returns:

  • (Enumerator, self)


175
176
177
178
179
180
# File 'lib/conduit/stream.rb', line 175

def each(&block)
  return enum_for(:each) unless block

  on_parsed(&block)
  self
end

#finishself Also known as: close

Signal end-of-input. Processes any bytes left in the buffer as a final frame, so trailing data not terminated by the frame separator still produces an event.

Call this when the underlying transport closes cleanly without a trailing “nn” (typical for many HTTP SSE servers). Safe to call multiple times; safe to call on an empty buffer; safe to keep using the stream afterwards.

Returns:

  • (self)


158
159
160
161
162
163
164
# File 'lib/conduit/stream.rb', line 158

def finish
  return self if @buffer.empty?

  remainder = @buffer.slice!(0, @buffer.length)
  process_frame(remainder)
  self
end

#on_chunk {|chunk| ... } ⇒ Object

Raw chunk as it arrived (after normalization).

The chunk is a string that has been normalized (UTF-8 encoded, CRLF→LF). This is called for every chunk fed to the stream via <<, regardless of whether the chunk contains complete frames or partial data.

Yields:

  • (chunk)

    The normalized chunk string



51
52
53
# File 'lib/conduit/stream.rb', line 51

def on_chunk(&block)
  @callbacks.on(:chunk, &block)
end

#on_error {|error| ... } ⇒ Object

Errors raised by any callback or by the parser.

When a callback (other than on_error itself) or the parser raises an error, it’s routed to this handler if registered. This prevents errors from interrupting the stream processing.

If on_error is not registered, errors will bubble up and interrupt processing. If on_error itself raises, that error will bubble up.

Yields:

  • (error)

    The exception that was raised



128
129
130
# File 'lib/conduit/stream.rb', line 128

def on_error(&block)
  @callbacks.on(:error, &block)
end

#on_event {|event| ... } ⇒ Object

Fully parsed SSE event as a Event.

This callback receives a Conduit::Event object with the following attributes:

  • event: Event type (defaults to “message”)

  • data: Joined data field content (data lines joined by “n”)

  • id: Last event ID from the SSE spec

  • retry: Retry delay in milliseconds from the SSE spec

Only called for frames that contain at least one data field. Use this callback when you need access to SSE metadata (event type, id, retry).

Yields:

  • (event)

    A Conduit::Event object



79
80
81
# File 'lib/conduit/stream.rb', line 79

def on_event(&block)
  @callbacks.on(:event, &block)
end

#on_field {|name, value| ... } ⇒ Object

Every parsed SSE field line. Yields (name, value) for every field, including the standard ones (data/event/id/retry) and any custom fields a server emits.

Per the SSE spec, fields are parsed one per line with the format “name: value”. This callback is invoked for each field line as it’s parsed from the frame.

Yields:

  • (name, value)

    The field name and value as strings



114
115
116
# File 'lib/conduit/stream.rb', line 114

def on_field(&block)
  @callbacks.on(:field, &block)
end

#on_frame {|frame| ... } ⇒ Object

Complete frame text (after sanitization), regardless of whether it produces an event.

A frame is the text between frame separators (default: “nn”). This callback receives the raw frame string after sanitization (default: strip). This includes frames that may not produce events (e.g., frames without data fields). Ping frames are handled separately by on_ping and do not trigger this callback.

Yields:

  • (frame)

    The sanitized frame string



63
64
65
# File 'lib/conduit/stream.rb', line 63

def on_frame(&block)
  @callbacks.on(:frame, &block)
end

#on_parsed {|parsed| ... } ⇒ Object

Result of running the configured parser over an event’s data.

The parser receives ONLY the data field content (joined by “n”), not the entire frame. If you need access to other SSE fields (event type, id, retry), use on_event instead.

If the parser raises an error and an on_error handler is registered, the error is routed to on_error and this callback is NOT invoked for that event.

Yields:

  • (parsed)

    Whatever your parser lambda returns



92
93
94
# File 'lib/conduit/stream.rb', line 92

def on_parsed(&block)
  @callbacks.on(:parsed, &block)
end

#on_ping {|frame| ... } ⇒ Object

Ping/comment frame.

Ping frames are identified by the ping_pattern (default: “:”). These are typically used for keep-alive messages or comments. Ping frames do NOT trigger on_frame or on_event callbacks.

Yields:

  • (frame)

    The ping frame string



103
104
105
# File 'lib/conduit/stream.rb', line 103

def on_ping(&block)
  @callbacks.on(:ping, &block)
end