9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/llm_gateway/adapters/anthropic/stream_mapper.rb', line 9
def map(chunk)
case chunk[:event]
when "message_start"
delta = {
id: chunk.dig(:data, :message, :id),
model: chunk.dig(:data, :message, :model),
role: chunk.dig(:data, :message, :role)
}
usage_increment = chunk.dig(:data, :message, :usage) || {}
AssistantStreamMessageEvent.new(type: :message_start, usage_increment:, delta:)
when "content_block_start"
content_index = chunk.dig(:data, :index)
delta = chunk.dig(:data, :content_block, :text)
current_type = chunk.dig(:data, :content_block, :type)
content_block_types[content_index] = current_type
case current_type
when "thinking"
AssistantStreamEvent.new(type: :reasoning_start, content_index:, delta:)
when "text"
AssistantStreamEvent.new(type: :text_start, content_index:, delta:)
when "tool_use"
id = chunk.dig(:data, :content_block, :id)
name = chunk.dig(:data, :content_block, :name)
AssistantToolStartEvent.new(type: :tool_start, content_index:, delta:, id:, name:)
end
when "content_block_delta"
content_index = chunk.dig(:data, :index)
case content_block_types[content_index]
when "thinking"
delta = chunk.dig(:data, :delta, :thinking)
signature = chunk.dig(:data, :delta, :signature)
AssistantStreamReasoningEvent.new(type: :reasoning_delta, signature:, delta:, content_index:)
when "text"
delta = chunk.dig(:data, :delta, :text)
AssistantStreamEvent.new(type: :text_delta, content_index:, delta:)
when "tool_use"
delta = chunk.dig(:data, :delta, :partial_json)
AssistantStreamEvent.new(type: :tool_delta, content_index:, delta:)
end
when "content_block_stop"
content_index = chunk.dig(:data, :index)
type = case content_block_types[content_index]
when "thinking"
:reasoning_end
when "text"
:text_end
when "tool_use"
:tool_end
end
AssistantStreamEvent.new(type: type, content_index:, delta: "")
when "message_delta"
delta = normalize_message_delta(chunk.dig(:data, :delta) || {})
usage_increment = chunk.dig(:data, :usage) || {}
AssistantStreamMessageEvent.new(type: :message_delta, usage_increment:, delta:)
when "message_stop"
AssistantStreamMessageEvent.new(type: :message_end, usage_increment: {}, delta: {})
when "ping"
nil
when "error"
error = chunk.dig(:data, :error) || {}
message = error[:message] || "Stream error"
code = error[:type]
if LlmGateway::Errors.context_overflow_message?(message)
raise LlmGateway::Errors::PromptTooLong.new(message, code)
end
if code == "overloaded_error"
raise LlmGateway::Errors::OverloadError.new(message, code)
end
raise LlmGateway::Errors::APIStatusError.new(message, code)
end
end
|