Class: ActiveHarness::Http::StreamingClient
- Inherits:
-
Object
- Object
- ActiveHarness::Http::StreamingClient
- Defined in:
- lib/active_harness/http/streaming_client.rb
Overview
Streaming variant of Client. Calls on_token for each content token as it arrives via SSE. Accumulates and returns the full content string when the stream ends.
Instance Method Summary collapse
-
#post(url, headers:, body:, timeout: 60, on_token:, parse_chunk: nil) ⇒ Hash
{ content: String, usage: Hash|nil }.
Instance Method Details
#post(url, headers:, body:, timeout: 60, on_token:, parse_chunk: nil) ⇒ Hash
Returns { content: String, usage: Hash|nil }.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/active_harness/http/streaming_client.rb', line 19 def post(url, headers:, body:, timeout: 60, on_token:, parse_chunk: nil) http = Net::HTTP.new(url.host, url.port) http.use_ssl = true http.open_timeout = timeout http.read_timeout = timeout req = Net::HTTP::Post.new(url) headers.each { |k, v| req[k] = v } req.body = body buffer = "" content = "" usage = {} http.request(req) do |response| response.read_body do |chunk| buffer += chunk while (line_end = buffer.index("\n")) line = buffer.slice!(0, line_end + 1).strip next unless line.start_with?("data: ") data = line.delete_prefix("data: ") next if data == "[DONE]" parsed = JSON.parse(data) rescue next info = parse_chunk ? parse_chunk.call(parsed) : default_chunk(parsed) token = info[:token] if token && !token.empty? on_token.call(token) content += token end usage = usage.merge(info[:usage]) if info[:usage] end end end { content: content, usage: usage.empty? ? nil : usage } rescue Net::OpenTimeout, Net::ReadTimeout raise Errors::TimeoutError, "Request to #{url.host} timed out" rescue => e raise Errors::ProviderUnavailableError, "#{url.host} unreachable: #{e.}" end |