Module: FetchHive::Streaming

Defined in:
lib/fetch_hive/streaming.rb

Overview

Lightweight Server-Sent Events (SSE) parser for streaming Fetch Hive responses.

Sync example:

io = response.body  # any IO or string
FetchHive::Streaming.parse_sse(io) do |event|
  case event["type"]
  when "response" then print event["response"]
  when "usage"    then puts "\nUsage: #{event['usage']}"
  end
end

Class Method Summary collapse

Class Method Details

.parse_sse(io_or_string) {|Hash| ... } ⇒ Object

Yields each parsed SSE event hash from io_or_string. Stops when it encounters data: [DONE] or the stream is exhausted. Non-data lines, blank lines, and malformed JSON are silently skipped.

Parameters:

  • io_or_string (String, IO, #each_line)

    the SSE response body

Yields:

  • (Hash)

    parsed JSON event



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fetch_hive/streaming.rb', line 25

def self.parse_sse(io_or_string, &block)
  return enum_for(:parse_sse, io_or_string) unless block

  buf = +""
  reader = io_or_string.respond_to?(:read) ? io_or_string : StringIO.new(io_or_string)

  loop do
    chunk = reader.read(4096)
    break if chunk.nil? || chunk.empty?

    buf << chunk

    while (idx = buf.index("\n"))
      line = buf.slice!(0, idx + 1).chomp
      next unless line.start_with?("data: ")

      payload = line[6..]
      return if payload.strip == "[DONE]"

      begin
        yield JSON.parse(payload)
      rescue JSON::ParserError
        # skip malformed lines
      end
    end
  end

  # Process any remaining buffer content after stream ends
  buf.each_line do |line|
    line = line.chomp
    next unless line.start_with?("data: ")

    payload = line[6..]
    next if payload.strip == "[DONE]"

    begin
      yield JSON.parse(payload)
    rescue JSON::ParserError
      # skip malformed lines
    end
  end
end