Module: FetchHive::Streaming
- Defined in:
- lib/fetch_hive/streaming.rb
Overview
Lightweight Server-Sent Events (SSE) parser for streaming Fetch Hive responses.
Sync example:
io = response.body # any IO or string
FetchHive::Streaming.parse_sse(io) do |event|
case event["type"]
when "response" then print event["response"]
when "usage" then puts "\nUsage: #{event['usage']}"
end
end
Class Method Summary collapse
-
.parse_sse(io_or_string) {|Hash| ... } ⇒ Object
Yields each parsed SSE event hash from
io_or_string.
Class Method Details
.parse_sse(io_or_string) {|Hash| ... } ⇒ Object
Yields each parsed SSE event hash from io_or_string. Stops when it encounters data: [DONE] or the stream is exhausted. Non-data lines, blank lines, and malformed JSON are silently skipped.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fetch_hive/streaming.rb', line 25 def self.parse_sse(io_or_string, &block) return enum_for(:parse_sse, io_or_string) unless block buf = +"" reader = io_or_string.respond_to?(:read) ? io_or_string : StringIO.new(io_or_string) loop do chunk = reader.read(4096) break if chunk.nil? || chunk.empty? buf << chunk while (idx = buf.index("\n")) line = buf.slice!(0, idx + 1).chomp next unless line.start_with?("data: ") payload = line[6..] return if payload.strip == "[DONE]" begin yield JSON.parse(payload) rescue JSON::ParserError # skip malformed lines end end end # Process any remaining buffer content after stream ends buf.each_line do |line| line = line.chomp next unless line.start_with?("data: ") payload = line[6..] next if payload.strip == "[DONE]" begin yield JSON.parse(payload) rescue JSON::ParserError # skip malformed lines end end end |