Class: Clacky::OpenAIStreamAggregator
- Inherits:
-
Object
- Object
- Clacky::OpenAIStreamAggregator
- Defined in:
- lib/clacky/openai_stream_aggregator.rb
Overview
Reassembles an OpenAI-compatible chat-completion event stream into the non-streaming response shape that MessageFormat::OpenAI.parse_response consumes, while invoking on_chunk(input_tokens:, output_tokens:) every time the upstream emits a new usage frame.
Streaming frames look like:
{"id":"...","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
{"id":"...","choices":[{"index":0,"delta":{"content":"Hi"},"finish_reason":null}]}
{"id":"...","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_x","function":{"name":"shell","arguments":"{\"cmd"}}]}}]}
{"id":"...","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\":\"ls\"}"}}]}}]}
{"id":"...","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}
{"id":"...","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":3,"prompt_tokens_details":{"cached_tokens":2}}}
data: [DONE]
Instance Attribute Summary collapse
-
#bytes_seen ⇒ Object
readonly
Returns the value of attribute bytes_seen.
-
#frames_seen ⇒ Object
readonly
Returns the value of attribute frames_seen.
-
#parse_failures ⇒ Object
readonly
Returns the value of attribute parse_failures.
Instance Method Summary collapse
- #handle(data_str) ⇒ Object
-
#initialize(on_chunk: nil) ⇒ OpenAIStreamAggregator
constructor
A new instance of OpenAIStreamAggregator.
- #saw_done? ⇒ Boolean
-
#to_h ⇒ Object
Render the canonical non-streaming response shape.
Constructor Details
#initialize(on_chunk: nil) ⇒ OpenAIStreamAggregator
Returns a new instance of OpenAIStreamAggregator.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 21 def initialize(on_chunk: nil) @on_chunk = on_chunk @content = +"" @reasoning_content = +"" @role = "assistant" @finish_reason = nil @tool_calls = {} @usage = nil @last_input_tokens = 0 @last_output_tokens = 0 @parse_failures = 0 @frames_seen = 0 @bytes_seen = 0 @saw_done = false end |
Instance Attribute Details
#bytes_seen ⇒ Object (readonly)
Returns the value of attribute bytes_seen.
37 38 39 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 37 def bytes_seen @bytes_seen end |
#frames_seen ⇒ Object (readonly)
Returns the value of attribute frames_seen.
37 38 39 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 37 def frames_seen @frames_seen end |
#parse_failures ⇒ Object (readonly)
Returns the value of attribute parse_failures.
37 38 39 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 37 def parse_failures @parse_failures end |
Instance Method Details
#handle(data_str) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 43 def handle(data_str) @bytes_seen += data_str.bytesize if data_str == "[DONE]" @saw_done = true return end @frames_seen += 1 data = parse_or_nil(data_str) return unless data if (choice = (data["choices"] || []).first) delta = choice["delta"] || {} @role = delta["role"] if delta["role"] @content << delta["content"] if delta["content"].is_a?(String) @reasoning_content << delta["reasoning_content"] if delta["reasoning_content"].is_a?(String) if (tcs = delta["tool_calls"]) tcs.each { |tc| merge_tool_call(tc) } end @finish_reason = choice["finish_reason"] if choice["finish_reason"] emit_estimate_progress end if (u = data["usage"]) @usage = u emit_usage_progress(u) end end |
#saw_done? ⇒ Boolean
39 40 41 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 39 def saw_done? @saw_done end |
#to_h ⇒ Object
Render the canonical non-streaming response shape.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/clacky/openai_stream_aggregator.rb', line 72 def to_h tool_calls = @tool_calls.keys.sort.map do |idx| tc = @tool_calls[idx] { "id" => tc[:id], "type" => tc[:type] || "function", "function" => { "name" => tc[:name], "arguments" => tc[:arguments].to_s } } end = { "role" => @role, "content" => @content.empty? ? nil : @content } ["tool_calls"] = tool_calls unless tool_calls.empty? ["reasoning_content"] = @reasoning_content unless @reasoning_content.empty? { "choices" => [{ "index" => 0, "message" => , "finish_reason" => @finish_reason }], "usage" => @usage || {} } end |