Class: Clacky::AnthropicStreamAggregator
- Inherits:
-
Object
- Object
- Clacky::AnthropicStreamAggregator
- Defined in:
- lib/clacky/anthropic_stream_aggregator.rb
Overview
Reassembles an Anthropic Messages SSE stream (event: message_start / content_block_start / content_block_delta / content_block_stop / message_delta / message_stop / ping) into the same hash shape that MessageFormat::Anthropic.parse_response expects from a non-streaming response, while invoking on_chunk(input_tokens:, output_tokens:) as usage accumulates.
Wire reference: docs.anthropic.com/en/api/messages-streaming
Instance Method Summary collapse
- #handle(event, data_str) ⇒ Object
-
#initialize(on_chunk: nil) ⇒ AnthropicStreamAggregator
constructor
A new instance of AnthropicStreamAggregator.
-
#to_h ⇒ Object
Canonical non-streaming Anthropic response shape consumed by MessageFormat::Anthropic.parse_response.
Constructor Details
#initialize(on_chunk: nil) ⇒ AnthropicStreamAggregator
Returns a new instance of AnthropicStreamAggregator.
15 16 17 18 19 20 21 22 |
# File 'lib/clacky/anthropic_stream_aggregator.rb', line 15 def initialize(on_chunk: nil) @on_chunk = on_chunk @blocks = {} @stop_reason = nil @usage = {} @last_input_tokens = 0 @last_output_tokens = 0 end |
Instance Method Details
#handle(event, data_str) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/clacky/anthropic_stream_aggregator.rb', line 24 def handle(event, data_str) data = parse_or_nil(data_str) return unless data case event when "message_start" msg = data["message"] || {} if (u = msg["usage"]) @usage.merge!(u) emit_usage_progress end when "content_block_start" idx = data["index"] || @blocks.size cb = data["content_block"] || {} case cb["type"] when "tool_use" @blocks[idx] = { kind: :tool_use, id: cb["id"], name: cb["name"], input_str: +"" } else @blocks[idx] = { kind: :text, text: +"" } end when "content_block_delta" idx = data["index"] || 0 delta = data["delta"] || {} block = (@blocks[idx] ||= { kind: :text, text: +"" }) case delta["type"] when "text_delta" block[:kind] ||= :text block[:text] ||= +"" block[:text] << delta["text"].to_s when "input_json_delta" block[:kind] = :tool_use block[:input_str] ||= +"" block[:input_str] << delta["partial_json"].to_s when "thinking_delta" block[:kind] = :thinking block[:thinking] ||= +"" block[:thinking] << delta["thinking"].to_s end emit_estimate_progress when "content_block_stop" # Nothing to do: blocks are finalised in to_h. when "message_delta" if (d = data["delta"]) @stop_reason = d["stop_reason"] if d["stop_reason"] end if (u = data["usage"]) @usage.merge!(u) emit_usage_progress end when "message_stop", "ping", "error" # no-op end end |
#to_h ⇒ Object
Canonical non-streaming Anthropic response shape consumed by MessageFormat::Anthropic.parse_response.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/clacky/anthropic_stream_aggregator.rb', line 80 def to_h content_blocks = @blocks.keys.sort.map do |idx| b = @blocks[idx] case b[:kind] when :tool_use input_value = if b[:input_str].to_s.empty? {} else JSON.parse(b[:input_str]) rescue b[:input_str] end { "type" => "tool_use", "id" => b[:id], "name" => b[:name], "input" => input_value } else { "type" => "text", "text" => b[:text].to_s } end end { "content" => content_blocks, "stop_reason" => @stop_reason, "usage" => @usage } end |