Class: LlmGateway::Adapters::Anthropic::StreamMapper
- Inherits:
-
StreamMapper
- Object
- StreamMapper
- LlmGateway::Adapters::Anthropic::StreamMapper
- Defined in:
- lib/llm_gateway/adapters/anthropic/stream_mapper.rb
Instance Method Summary collapse
Methods inherited from StreamMapper
Constructor Details
This class inherits a constructor from LlmGateway::Adapters::StreamMapper
Instance Method Details
#map(chunk, &block) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/llm_gateway/adapters/anthropic/stream_mapper.rb', line 9 def map(chunk, &block) case chunk[:event] when "message_start" delta = { id: chunk.dig(:data, :message, :id), model: chunk.dig(:data, :message, :model), role: chunk.dig(:data, :message, :role) } accumulator.push({ type: :message_start, delta: }, &block) when "content_block_start" content_block = chunk.dig(:data, :content_block) || {} @current_content_block_type = content_block[:type] case @current_content_block_type when "thinking" accumulator.push({ type: :reasoning_start, delta: content_block[:thinking], signature: "" }, &block) when "text" accumulator.push({ type: :text_start, delta: content_block[:text] }, &block) when "tool_use" accumulator.push( { type: :tool_start, delta: "", id: content_block[:id], name: content_block[:name] }, &block ) end when "content_block_delta" case @current_content_block_type when "thinking" delta = chunk.dig(:data, :delta, :thinking) signature = chunk.dig(:data, :delta, :signature) || "" accumulator.push({ type: :reasoning_delta, signature:, delta: }, &block) when "text" delta = chunk.dig(:data, :delta, :text) accumulator.push({ type: :text_delta, delta: }, &block) when "tool_use" delta = chunk.dig(:data, :delta, :partial_json) accumulator.push({ type: :tool_delta, delta: }, &block) end when "content_block_stop" case @current_content_block_type when "thinking" accumulator.push({ type: :reasoning_end, delta: "", signature: "" }, &block) when "text" accumulator.push({ type: :text_end, delta: "" }, &block) when "tool_use" accumulator.push({ type: :tool_end, delta: "" }, &block) end @current_content_block_type = nil when "message_delta" data = chunk[:data] || {} delta = (data[:delta] || {}) patch = { type: :message_delta, delta: } patch[:usage] = normalized_usage(data[:usage]) if data.key?(:usage) accumulator.push(patch, &block) when "message_stop" accumulator.push({ type: :message_end }, &block) when "ping" nil when "error" raise_stream_error!(chunk.dig(:data, :error) || {}, overload_codes: [ "overloaded_error" ]) end end |