Class: LlmGateway::Adapters::Anthropic::StreamMapper

Inherits:
Object
  • Object
show all
Defined in:
lib/llm_gateway/adapters/anthropic/stream_mapper.rb

Instance Method Summary collapse

Instance Method Details

#map(chunk) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/llm_gateway/adapters/anthropic/stream_mapper.rb', line 9

def map(chunk)
  case chunk[:event]
  when "message_start"
    delta = {
      id: chunk.dig(:data, :message, :id),
      model: chunk.dig(:data, :message, :model),
      role: chunk.dig(:data, :message, :role)
    }
    usage_increment = chunk.dig(:data, :message, :usage) || {}

    AssistantStreamMessageEvent.new(type: :message_start, usage_increment:, delta:)
  when "content_block_start"
    content_index = chunk.dig(:data, :index)
    delta = chunk.dig(:data, :content_block, :text)
    current_type = chunk.dig(:data, :content_block, :type)
    content_block_types[content_index] = current_type

    case current_type
    when "thinking"
      AssistantStreamEvent.new(type: :reasoning_start, content_index:, delta:)
    when "text"
      AssistantStreamEvent.new(type: :text_start, content_index:, delta:)
    when "tool_use"
      id = chunk.dig(:data, :content_block, :id)
      name = chunk.dig(:data, :content_block, :name)
      AssistantToolStartEvent.new(type: :tool_start, content_index:, delta:, id:, name:)
    end
  when "content_block_delta"
    content_index = chunk.dig(:data, :index)

    case content_block_types[content_index]
    when "thinking"
      delta = chunk.dig(:data, :delta, :thinking)
      signature = chunk.dig(:data, :delta, :signature)
      AssistantStreamReasoningEvent.new(type: :reasoning_delta, signature:, delta:, content_index:)
    when "text"
      delta = chunk.dig(:data, :delta, :text)
      AssistantStreamEvent.new(type: :text_delta, content_index:, delta:)
    when "tool_use"
      delta = chunk.dig(:data, :delta, :partial_json)
      AssistantStreamEvent.new(type: :tool_delta, content_index:, delta:)
    end
  when "content_block_stop"
    content_index = chunk.dig(:data, :index)
    type = case content_block_types[content_index]
    when "thinking"
      :reasoning_end
    when "text"
      :text_end
    when "tool_use"
      :tool_end
    end
    AssistantStreamEvent.new(type: type, content_index:, delta: "")
  when "message_delta"
    delta = normalize_message_delta(chunk.dig(:data, :delta) || {})
    usage_increment = chunk.dig(:data, :usage) || {}

    AssistantStreamMessageEvent.new(type: :message_delta, usage_increment:, delta:)
  when "message_stop"
    AssistantStreamMessageEvent.new(type: :message_end, usage_increment: {}, delta: {})
  when "ping"
    nil
  when "error"
    error = chunk.dig(:data, :error) || {}
    message = error[:message] || "Stream error"
    code = error[:type]

    if LlmGateway::Errors.context_overflow_message?(message)
      raise LlmGateway::Errors::PromptTooLong.new(message, code)
    end

    if code == "overloaded_error"
      raise LlmGateway::Errors::OverloadError.new(message, code)
    end

    raise LlmGateway::Errors::APIStatusError.new(message, code)
  end
end