Class: Clacky::BedrockStreamAggregator
- Inherits:
-
Object
- Object
- Clacky::BedrockStreamAggregator
- Defined in:
- lib/clacky/bedrock_stream_aggregator.rb
Overview
Reassembles a Bedrock Converse event stream into the same hash shape that MessageFormat::Bedrock.parse_response expects from a non-streaming response, while invoking on_chunk(input_tokens:, output_tokens:) as usage information accumulates.
Bedrock event-stream events handled (passed through as raw event JSON):
messageStart → { role: "assistant" }
contentBlockStart → { start: {toolUse: {toolUseId, name}} | {}, contentBlockIndex: N }
contentBlockDelta → { delta: {text: "..."} | {toolUse: {input: "..."}}, contentBlockIndex: N }
contentBlockStop → { contentBlockIndex: N }
messageStop → { stopReason: "end_turn" | "tool_use" | "max_tokens" | ... }
metadata → { usage: {inputTokens, outputTokens, cacheReadInputTokens, cacheWriteInputTokens}, metrics: {...} }
Tool-use input is streamed as a sequence of partial JSON strings; we concatenate and let the response parser leave it as a string for downstream tool dispatch (which calls JSON.parse with a {} fallback).
Instance Method Summary collapse
- #handle(event, data_str) ⇒ Object
-
#initialize(on_chunk: nil) ⇒ BedrockStreamAggregator
constructor
A new instance of BedrockStreamAggregator.
-
#to_h ⇒ Object
Render the canonical non-streaming Bedrock response hash so the existing MessageFormat::Bedrock.parse_response can consume it unchanged.
Constructor Details
#initialize(on_chunk: nil) ⇒ BedrockStreamAggregator
Returns a new instance of BedrockStreamAggregator.
24 25 26 27 28 29 30 31 32 |
# File 'lib/clacky/bedrock_stream_aggregator.rb', line 24 def initialize(on_chunk: nil) @on_chunk = on_chunk @role = "assistant" @blocks = {} @stop_reason = nil @usage = {} @last_input_tokens = 0 @last_output_tokens = 0 end |
Instance Method Details
#handle(event, data_str) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/clacky/bedrock_stream_aggregator.rb', line 34 def handle(event, data_str) data = parse_or_nil(data_str) return unless data case event when "messageStart" @role = data["role"] || @role when "contentBlockStart" idx = data["contentBlockIndex"] || @blocks.size start = data["start"] || {} if (tu = start["toolUse"]) @blocks[idx] = { kind: :tool_use, id: tu["toolUseId"], name: tu["name"], input_str: +"" } else @blocks[idx] = { kind: :text, text: +"" } end when "contentBlockDelta" idx = data["contentBlockIndex"] || 0 delta = data["delta"] || {} block = (@blocks[idx] ||= { kind: :text, text: +"" }) if delta["text"] block[:kind] ||= :text block[:text] ||= +"" block[:text] << delta["text"] elsif (tu = delta["toolUse"]) block[:kind] = :tool_use block[:input_str] ||= +"" block[:input_str] << tu["input"].to_s block[:id] ||= tu["toolUseId"] block[:name] ||= tu["name"] elsif (rc = delta["reasoningContent"]) block[:kind] = :reasoning block[:reasoning] ||= +"" block[:reasoning] << rc["text"].to_s end emit_estimate_progress when "contentBlockStop" # Nothing to assemble: blocks are kept as-is until messageStop. when "messageStop" @stop_reason = data["stopReason"] || @stop_reason when "metadata" if (u = data["usage"]) @usage.merge!(u) emit_usage_progress(u) end end end |
#to_h ⇒ Object
Render the canonical non-streaming Bedrock response hash so the existing MessageFormat::Bedrock.parse_response can consume it unchanged.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/clacky/bedrock_stream_aggregator.rb', line 83 def to_h content_blocks = @blocks.keys.sort.map do |idx| b = @blocks[idx] case b[:kind] when :tool_use input_value = b[:input_str].to_s.empty? ? {} : (JSON.parse(b[:input_str]) rescue b[:input_str]) { "toolUse" => { "toolUseId" => b[:id], "name" => b[:name], "input" => input_value } } else { "text" => b[:text].to_s } end end { "output" => { "message" => { "role" => @role, "content" => content_blocks } }, "stopReason" => @stop_reason, "usage" => @usage } end |