Class: Clacky::OpenAIStreamAggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/openai_stream_aggregator.rb

Overview

Reassembles an OpenAI-compatible chat-completion event stream into the non-streaming response shape that MessageFormat::OpenAI.parse_response consumes, while invoking on_chunk(input_tokens:, output_tokens:) every time the upstream emits a new usage frame.

Streaming frames look like:

{"id":"...","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
{"id":"...","choices":[{"index":0,"delta":{"content":"Hi"},"finish_reason":null}]}
{"id":"...","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_x","function":{"name":"shell","arguments":"{\"cmd"}}]}}]}
{"id":"...","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\":\"ls\"}"}}]}}]}
{"id":"...","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}
{"id":"...","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":3,"prompt_tokens_details":{"cached_tokens":2}}}
data: [DONE]

Instance Method Summary collapse

Constructor Details

#initialize(on_chunk: nil) ⇒ OpenAIStreamAggregator

Returns a new instance of OpenAIStreamAggregator.



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/clacky/openai_stream_aggregator.rb', line 21

def initialize(on_chunk: nil)
  @on_chunk = on_chunk
  @content = +""
  @reasoning_content = +""
  @role = "assistant"
  @finish_reason = nil
  @tool_calls = {}
  @usage = nil
  @last_input_tokens = 0
  @last_output_tokens = 0
end

Instance Method Details

#handle(data_str) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/clacky/openai_stream_aggregator.rb', line 33

def handle(data_str)
  return if data_str == "[DONE]"
  data = parse_or_nil(data_str)
  return unless data

  if (choice = (data["choices"] || []).first)
    delta = choice["delta"] || {}
    @role = delta["role"] if delta["role"]
    @content << delta["content"] if delta["content"].is_a?(String)
    @reasoning_content << delta["reasoning_content"] if delta["reasoning_content"].is_a?(String)
    if (tcs = delta["tool_calls"])
      tcs.each { |tc| merge_tool_call(tc) }
    end
    @finish_reason = choice["finish_reason"] if choice["finish_reason"]
    emit_estimate_progress
  end

  if (u = data["usage"])
    @usage = u
    emit_usage_progress(u)
  end
end

#to_hObject

Render the canonical non-streaming response shape.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/clacky/openai_stream_aggregator.rb', line 57

def to_h
  tool_calls = @tool_calls.keys.sort.map do |idx|
    tc = @tool_calls[idx]
    {
      "id"       => tc[:id],
      "type"     => tc[:type] || "function",
      "function" => {
        "name"      => tc[:name],
        "arguments" => tc[:arguments].to_s
      }
    }
  end

  message = {
    "role"    => @role,
    "content" => @content.empty? ? nil : @content
  }
  message["tool_calls"] = tool_calls unless tool_calls.empty?
  message["reasoning_content"] = @reasoning_content unless @reasoning_content.empty?

  {
    "choices" => [{ "index" => 0, "message" => message, "finish_reason" => @finish_reason }],
    "usage"   => @usage || {}
  }
end