Module: Buble::Streaming

Defined in:
lib/buble/streaming.rb

Defined Under Namespace

Classes: Event, SSEParser

Class Method Summary collapse

Class Method Details

.events_from_lines(lines) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/buble/streaming.rb', line 60

def events_from_lines(lines)
  Enumerator.new do |yielder|
    parser = SSEParser.new
    lines.each do |line|
      event = parser.push_line(line)
      yielder << event if event
    end
    final = parser.finish
    yielder << final if final
  end
end

.text_from_event(event, protocol) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/buble/streaming.rb', line 72

def text_from_event(event, protocol)
  return nil if event.data == '[DONE]'

  body = event.json
  return nil unless body.is_a?(Hash)

  case protocol
  when :openai
    body.dig('choices', 0, 'delta', 'content')
  when :anthropic
    body.dig('delta', 'text')
  when :gemini
    body.dig('candidates', 0, 'content', 'parts', 0, 'text')
  end
end

.text_stream(events, protocol) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/buble/streaming.rb', line 88

def text_stream(events, protocol)
  Enumerator.new do |yielder|
    events.each do |event|
      text = text_from_event(event, protocol)
      yielder << text if text && !text.empty?
    end
  end
end