Class: Clacky::OpenAIStreamAggregator
- Inherits:
-
Object
- Object
- Clacky::OpenAIStreamAggregator
- Defined in:
- lib/clacky/openai_stream_aggregator.rb
Overview
Reassembles an OpenAI-compatible chat-completion event stream into the non-streaming response shape that MessageFormat::OpenAI.parse_response consumes, while invoking on_chunk(input_tokens:, output_tokens:) every time the upstream emits a new usage frame.
Streaming frames look like:
{"id":"...","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
{"id":"...","choices":[{"index":0,"delta":{"content":"Hi"},"finish_reason":null}]}
{"id":"...","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_x","function":{"name":"shell","arguments":"{\"cmd"}}]}}]}
{"id":"...","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\":\"ls\"}"}}]}}]}
{"id":"...","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}
{"id":"...","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":3,"prompt_tokens_details":{"cached_tokens":2}}}
data: [DONE]
Instance Method Summary collapse
- #handle(data_str) ⇒ Object
-
#initialize(on_chunk: nil) ⇒ OpenAIStreamAggregator
constructor
A new instance of OpenAIStreamAggregator.
-
#to_h ⇒ Object
Render the canonical non-streaming response shape.
Constructor Details
#initialize(on_chunk: nil) ⇒ OpenAIStreamAggregator
Returns a new instance of OpenAIStreamAggregator.
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 21 def initialize(on_chunk: nil) @on_chunk = on_chunk @content = +"" @reasoning_content = +"" @role = "assistant" @finish_reason = nil @tool_calls = {} @usage = nil @last_input_tokens = 0 @last_output_tokens = 0 end |
Instance Method Details
#handle(data_str) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 33 def handle(data_str) return if data_str == "[DONE]" data = parse_or_nil(data_str) return unless data if (choice = (data["choices"] || []).first) delta = choice["delta"] || {} @role = delta["role"] if delta["role"] @content << delta["content"] if delta["content"].is_a?(String) @reasoning_content << delta["reasoning_content"] if delta["reasoning_content"].is_a?(String) if (tcs = delta["tool_calls"]) tcs.each { |tc| merge_tool_call(tc) } end @finish_reason = choice["finish_reason"] if choice["finish_reason"] emit_estimate_progress end if (u = data["usage"]) @usage = u emit_usage_progress(u) end end |
#to_h ⇒ Object
Render the canonical non-streaming response shape.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 57 def to_h tool_calls = @tool_calls.keys.sort.map do |idx| tc = @tool_calls[idx] { "id" => tc[:id], "type" => tc[:type] || "function", "function" => { "name" => tc[:name], "arguments" => tc[:arguments].to_s } } end = { "role" => @role, "content" => @content.empty? ? nil : @content } ["tool_calls"] = tool_calls unless tool_calls.empty? ["reasoning_content"] = @reasoning_content unless @reasoning_content.empty? { "choices" => [{ "index" => 0, "message" => , "finish_reason" => @finish_reason }], "usage" => @usage || {} } end |