Module: Legion::LLM::API::Translators::AnthropicResponse
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/api/translators/anthropic_response.rb
Class Method Summary collapse
- .extract_tool_calls(pipeline_response) ⇒ Object
-
.format(pipeline_response, model:, request_id: nil) ⇒ Object
Format internal pipeline response into Anthropic Messages API shape.
-
.format_chunk(text, index: 0) ⇒ Object
Emit Anthropic streaming events for a single text chunk.
- .format_stop_reason(pipeline_response) ⇒ Object
-
.streaming_events(pipeline_response, model:, request_id: nil, full_text: '') ⇒ Object
Ordered sequence of SSE event hashes for a complete streaming response.
- .token_count(tokens, key) ⇒ Object
Class Method Details
.extract_tool_calls(pipeline_response) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 141 def self.extract_tool_calls(pipeline_response) return [] unless pipeline_response.respond_to?(:tools) Array(pipeline_response.tools).map do |tc| { id: tc.respond_to?(:id) ? tc.id : (tc[:id] || tc['id'] || "toolu_#{SecureRandom.hex(10)}"), name: tc.respond_to?(:name) ? tc.name : (tc[:name] || tc['name'] || ''), arguments: tc.respond_to?(:arguments) ? tc.arguments : (tc[:arguments] || tc['arguments'] || {}) } end end |
.format(pipeline_response, model:, request_id: nil) ⇒ Object
Format internal pipeline response into Anthropic Messages API shape.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 14 def self.format(pipeline_response, model:, request_id: nil) log.debug("[llm][api][anthropic] action=format request_id=#{request_id} model=#{model}") msg = pipeline_response. content = extract_content(msg, pipeline_response) tokens = pipeline_response.respond_to?(:tokens) ? pipeline_response.tokens : nil routing = pipeline_response.respond_to?(:routing) ? (pipeline_response.routing || {}) : {} resolved_model = routing[:model] || routing['model'] || model { id: request_id || "msg_#{SecureRandom.hex(12)}", type: 'message', role: 'assistant', content: content, model: resolved_model.to_s, stop_reason: format_stop_reason(pipeline_response), stop_sequence: nil, usage: format_usage(tokens) } end |
.format_chunk(text, index: 0) ⇒ Object
Emit Anthropic streaming events for a single text chunk. Returns the SSE lines for the delta event.
38 39 40 41 42 43 44 45 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 38 def self.format_chunk(text, index: 0) log.debug("[llm][api][anthropic] action=format_chunk index=#{index} text=#{text[0, 80]}") { type: 'content_block_delta', index: index, delta: { type: 'text_delta', text: text } } end |
.format_stop_reason(pipeline_response) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 153 def self.format_stop_reason(pipeline_response) tool_calls = extract_tool_calls(pipeline_response) return 'tool_use' if tool_calls.any? return 'end_turn' unless pipeline_response.respond_to?(:stop) stop = pipeline_response.stop reason = stop.is_a?(Hash) ? (stop[:reason] || stop['reason']) : stop.to_s case reason.to_s when 'tool_use' then 'tool_use' when 'max_tokens' then 'max_tokens' when 'stop' then 'stop_sequence' else 'end_turn' end end |
.streaming_events(pipeline_response, model:, request_id: nil, full_text: '') ⇒ Object
Ordered sequence of SSE event hashes for a complete streaming response. Caller emits each via emit_sse_event.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 49 def self.streaming_events(pipeline_response, model:, request_id: nil, full_text: '') log.debug("[llm][api][anthropic] action=streaming_events request_id=#{request_id} text_length=#{full_text.length}") tokens = pipeline_response.respond_to?(:tokens) ? pipeline_response.tokens : nil routing = pipeline_response.respond_to?(:routing) ? (pipeline_response.routing || {}) : {} resolved_model = routing[:model] || routing['model'] || model tool_calls = extract_tool_calls(pipeline_response) content_index = 0 events = [] events << ['message_start', { type: 'message_start', message: { id: request_id || "msg_#{SecureRandom.hex(12)}", type: 'message', role: 'assistant', content: [], model: resolved_model.to_s, stop_reason: nil, stop_sequence: nil, usage: { input_tokens: token_count(tokens, :input), output_tokens: 0 } } }] events << ['content_block_start', { type: 'content_block_start', index: content_index, content_block: { type: 'text', text: '' } }] events << ['ping', { type: 'ping' }] unless full_text.empty? events << ['content_block_delta', { type: 'content_block_delta', index: content_index, delta: { type: 'text_delta', text: full_text } }] end events << ['content_block_stop', { type: 'content_block_stop', index: content_index }] content_index += 1 tool_calls.each do |tc| events << ['content_block_start', { type: 'content_block_start', index: content_index, content_block: { type: 'tool_use', id: tc[:id], name: tc[:name], input: {} } }] events << ['content_block_delta', { type: 'content_block_delta', index: content_index, delta: { type: 'input_json_delta', partial_json: Legion::JSON.dump(tc[:arguments] || {}) } }] events << ['content_block_stop', { type: 'content_block_stop', index: content_index }] content_index += 1 end stop_reason = format_stop_reason(pipeline_response) events << ['message_delta', { type: 'message_delta', delta: { stop_reason: stop_reason, stop_sequence: nil }, usage: { output_tokens: token_count(tokens, :output) } }] events << ['message_stop', { type: 'message_stop' }] events end |
.token_count(tokens, key) ⇒ Object
177 178 179 180 181 182 183 184 185 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 177 def self.token_count(tokens, key) return 0 if tokens.nil? return tokens[key] || tokens[key.to_s] || 0 if tokens.is_a?(Hash) method_name = { input: :input_tokens, output: :output_tokens }[key] return tokens.public_send(method_name) if method_name && tokens.respond_to?(method_name) 0 end |