Class: ActiveHarness::Http::StreamingClient

Inherits:
Object
  • Object
show all
Defined in:
lib/active_harness/http/streaming_client.rb

Overview

Streaming variant of Client. Calls on_token for each content token as it arrives via SSE. Accumulates and returns the full content string when the stream ends.

Instance Method Summary collapse

Instance Method Details

#post(url, headers:, body:, timeout: 60, on_token:, parse_chunk: nil) ⇒ Hash

Returns { content: String, usage: Hash|nil }.

Parameters:

  • url (URI)
  • headers (Hash{String => String})
  • body (String)

    JSON-serialized body

  • timeout (Integer) (defaults to: 60)

    seconds (open + read)

  • on_token (Proc)

    called with each partial token string

  • parse_chunk (Proc, nil) (defaults to: nil)

    receives each parsed SSE JSON hash; must return { token: String|nil, usage: Hash|nil }. Defaults to OpenAI-compatible format.

Returns:

  • (Hash)

    { content: String, usage: Hash|nil }



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/active_harness/http/streaming_client.rb', line 19

def post(url, headers:, body:, timeout: 60, on_token:, parse_chunk: nil)
  http              = Net::HTTP.new(url.host, url.port)
  http.use_ssl      = true
  http.open_timeout = timeout
  http.read_timeout = timeout

  req = Net::HTTP::Post.new(url)
  headers.each { |k, v| req[k] = v }
  req.body = body

  buffer  = ""
  content = ""
  usage   = {}

  http.request(req) do |response|
    response.read_body do |chunk|
      buffer += chunk
      while (line_end = buffer.index("\n"))
        line = buffer.slice!(0, line_end + 1).strip
        next unless line.start_with?("data: ")

        data = line.delete_prefix("data: ")
        next if data == "[DONE]"

        parsed = JSON.parse(data) rescue next
        info   = parse_chunk ? parse_chunk.call(parsed) : default_chunk(parsed)
        token  = info[:token]
        if token && !token.empty?
          on_token.call(token)
          content += token
        end
        usage = usage.merge(info[:usage]) if info[:usage]
      end
    end
  end

  { content: content, usage: usage.empty? ? nil : usage }
rescue Net::OpenTimeout, Net::ReadTimeout
  raise Errors::TimeoutError, "Request to #{url.host} timed out"
rescue => e
  raise Errors::ProviderUnavailableError, "#{url.host} unreachable: #{e.message}"
end