Class: ActiveHarness::Http::StreamingClient
- Inherits:
-
Object
- Object
- ActiveHarness::Http::StreamingClient
- Defined in:
- lib/active_harness/http/streaming_client.rb
Overview
Streaming variant of Client. Calls on_token for each content token as it arrives via SSE. Accumulates and returns the full content string when the stream ends.
Instance Method Summary collapse
-
#post(url, headers:, body:, timeout: 60, on_token:) ⇒ String
Full accumulated content.
Instance Method Details
#post(url, headers:, body:, timeout: 60, on_token:) ⇒ String
Returns full accumulated content.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/active_harness/http/streaming_client.rb', line 16 def post(url, headers:, body:, timeout: 60, on_token:) http = Net::HTTP.new(url.host, url.port) http.use_ssl = true http.open_timeout = timeout http.read_timeout = timeout req = Net::HTTP::Post.new(url) headers.each { |k, v| req[k] = v } req.body = body buffer = "" content = "" http.request(req) do |response| response.read_body do |chunk| buffer += chunk while (line_end = buffer.index("\n")) line = buffer.slice!(0, line_end + 1).strip next unless line.start_with?("data: ") data = line.delete_prefix("data: ") next if data == "[DONE]" parsed = JSON.parse(data) token = parsed.dig("choices", 0, "delta", "content") if token && !token.empty? on_token.call(token) content += token end end end end content rescue Net::OpenTimeout, Net::ReadTimeout raise Errors::TimeoutError, "Request to #{url.host} timed out" rescue JSON::ParserError # ignore malformed SSE chunks content rescue => e raise Errors::ProviderUnavailableError, "#{url.host} unreachable: #{e.}" end |