Module: Legion::LLM::API::Translators::AnthropicResponse
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/api/translators/anthropic_response.rb
Class Method Summary collapse
- .extract_tool_calls(pipeline_response) ⇒ Object
-
.format(pipeline_response, model:, request_id: nil) ⇒ Object
Format internal pipeline response into Anthropic Messages API shape.
-
.format_chunk(text, index: 0) ⇒ Object
Emit Anthropic streaming events for a single text chunk.
- .format_stop_reason(pipeline_response) ⇒ Object
- .normalize_thinking_payload(value) ⇒ Object
-
.streaming_events(pipeline_response, model:, request_id: nil, full_text: '') ⇒ Object
Ordered sequence of SSE event hashes for a complete streaming response.
- .thinking_content_block(pipeline_response) ⇒ Object
- .thinking_payload(pipeline_response) ⇒ Object
- .token_count(tokens, key) ⇒ Object
Class Method Details
.extract_tool_calls(pipeline_response) ⇒ Object
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 275 def self.extract_tool_calls(pipeline_response) return [] unless pipeline_response.respond_to?(:tools) Array(pipeline_response.tools).map do |tc| source = tc.respond_to?(:source) ? tc.source : (tc[:source] || tc['source'] || {}) is_legionio = legionio_tool_source?(source) { id: if tc.respond_to?(:id) tc.id else tc[:id] || tc['id'] || (is_legionio ? "srvtoolu_#{SecureRandom.hex(10)}" : "toolu_#{SecureRandom.hex(10)}") end, name: tc.respond_to?(:name) ? tc.name : (tc[:name] || tc['name'] || ''), arguments: tc.respond_to?(:arguments) ? tc.arguments : (tc[:arguments] || tc['arguments'] || {}), result: tc.respond_to?(:result) ? tc.result : (tc[:result] || tc['result']), source: source, legionio: is_legionio } end end |
.format(pipeline_response, model:, request_id: nil) ⇒ Object
Format internal pipeline response into Anthropic Messages API shape.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 14 def self.format(pipeline_response, model:, request_id: nil) log.debug("[llm][api][anthropic] action=format request_id=#{request_id} model=#{model}") msg = pipeline_response. content = extract_content(msg, pipeline_response) tokens = pipeline_response.respond_to?(:tokens) ? pipeline_response.tokens : nil routing = pipeline_response.respond_to?(:routing) ? (pipeline_response.routing || {}) : {} resolved_model = routing[:model] || routing['model'] || model { id: request_id || "msg_#{SecureRandom.hex(12)}", type: 'message', role: 'assistant', content: content, model: resolved_model.to_s, stop_reason: format_stop_reason(pipeline_response), stop_sequence: nil, usage: format_usage(tokens) } end |
.format_chunk(text, index: 0) ⇒ Object
Emit Anthropic streaming events for a single text chunk. Returns the SSE lines for the delta event.
38 39 40 41 42 43 44 45 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 38 def self.format_chunk(text, index: 0) log.debug("[llm][api][anthropic] action=format_chunk index=#{index} text=#{text[0, 80]}") { type: 'content_block_delta', index: index, delta: { type: 'text_delta', text: text } } end |
.format_stop_reason(pipeline_response) ⇒ Object
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 304 def self.format_stop_reason(pipeline_response) tool_calls = extract_tool_calls(pipeline_response) # Only return 'pause_turn' for LegionIO tools that are still pending # (i.e., executed server-side but result not yet available). # If all LegionIO tools have results, the tool loop completed and # the turn should end normally. pending_legionio = tool_calls.select { |tc| tc[:legionio] == true && tc[:result].nil? } return 'pause_turn' if pending_legionio.any? # Client-side (passthrough) tool_use blocks without results mean # the client needs to execute them. return 'tool_use' if tool_calls.any? { |tc| tc[:legionio] != true && tc[:result].nil? } return 'end_turn' unless pipeline_response.respond_to?(:stop) stop = pipeline_response.stop reason = stop.is_a?(Hash) ? (stop[:reason] || stop['reason']) : stop.to_s case reason.to_s when 'tool_use' then 'tool_use' when 'max_tokens' then 'max_tokens' when 'content_filter' then 'content_filter' when 'stop' pipeline_response.respond_to?(:stop_sequence) && pipeline_response.stop_sequence ? 'stop_sequence' : 'end_turn' else 'end_turn' end end |
.normalize_thinking_payload(value) ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 247 def self.normalize_thinking_payload(value) return nil unless value if value.is_a?(Hash) normalized = value.transform_keys { |key| key.respond_to?(:to_sym) ? key.to_sym : key } content = normalized[:content] || normalized[:text] || normalized[:thinking] signature = normalized[:signature] data = normalized[:data] redacted = normalized[:redacted] || normalized[:type].to_s == 'redacted_thinking' type = normalized[:type] else content = value.respond_to?(:content) ? value.content : nil content = value.text if content.nil? && value.respond_to?(:text) content = value unless value.respond_to?(:content) || value.respond_to?(:text) signature = value.respond_to?(:signature) ? value.signature : nil data = value.respond_to?(:data) ? value.data : nil redacted = false type = nil end content = content.to_s unless content.nil? signature = signature.to_s unless signature.nil? data = data.to_s unless data.nil? return nil if content.to_s.empty? && signature.to_s.empty? && data.to_s.empty? { content: content, signature: signature, data: data, redacted: redacted, type: type }.compact end |
.streaming_events(pipeline_response, model:, request_id: nil, full_text: '') ⇒ Object
Ordered sequence of SSE event hashes for a complete streaming response. Caller emits each via emit_sse_event.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 49 def self.streaming_events(pipeline_response, model:, request_id: nil, full_text: '') log.debug("[llm][api][anthropic] action=streaming_events request_id=#{request_id} text_length=#{full_text.length}") tokens = pipeline_response.respond_to?(:tokens) ? pipeline_response.tokens : nil routing = pipeline_response.respond_to?(:routing) ? (pipeline_response.routing || {}) : {} resolved_model = routing[:model] || routing['model'] || model tool_calls = extract_tool_calls(pipeline_response) content_index = 0 events = [] events << ['message_start', { type: 'message_start', message: { id: request_id || "msg_#{SecureRandom.hex(12)}", type: 'message', role: 'assistant', content: [], model: resolved_model.to_s, stop_reason: nil, stop_sequence: nil, usage: { input_tokens: token_count(tokens, :input), output_tokens: 0 } } }] events << ['ping', { type: 'ping' }] # Emit text block only when there's real text content. # When tool calls are present and the text is just JSON arguments # (common with vLLM/qwen forced tool choices), skip the text block # so the Anthropic-format SSE emits only tool_use blocks with # stop_reason=tool_use. text_is_tool_arguments = tool_calls.any? && text_looks_like_tool_json?(full_text) unless full_text.empty? || text_is_tool_arguments events << ['content_block_start', { type: 'content_block_start', index: content_index, content_block: { type: 'text', text: '' } }] events << ['content_block_delta', { type: 'content_block_delta', index: content_index, delta: { type: 'text_delta', text: full_text } }] events << ['content_block_stop', { type: 'content_block_stop', index: content_index }] content_index += 1 end # Emit tool_use/server_tool_use blocks (index may have been bumped above) tool_calls.each do |tc| if tc[:legionio] == true # Server tool: emit call + result together events << ['content_block_start', { type: 'content_block_start', index: content_index, content_block: { type: 'server_tool_use', id: tc[:id], name: tc[:name], input: tc[:arguments] || {} } }] events << ['content_block_delta', { type: 'content_block_delta', index: content_index, delta: { type: 'input_json_delta', partial_json: Legion::JSON.dump(tc[:arguments] || {}) } }] events << ['content_block_stop', { type: 'content_block_stop', index: content_index }] content_index += 1 # Emit result result = tc[:result] if result result_str = if result.is_a?(String) result else begin Legion::JSON.dump(result) rescue StandardError result.to_s end end events << ['content_block_start', { type: 'content_block_start', index: content_index, content_block: { type: 'server_tool_result', id: tc[:id], content: [] } }] events << ['content_block_delta', { type: 'content_block_delta', index: content_index, delta: { type: 'content_block_delta', content: [{ type: 'text', text: result_str }] } }] events << ['content_block_stop', { type: 'content_block_stop', index: content_index }] content_index += 1 end else events << ['content_block_start', { type: 'content_block_start', index: content_index, content_block: { type: 'tool_use', id: tc[:id], name: tc[:name], input: tc[:arguments] || {} } }] events << ['content_block_delta', { type: 'content_block_delta', index: content_index, delta: { type: 'input_json_delta', partial_json: Legion::JSON.dump(tc[:arguments] || {}) } }] events << ['content_block_stop', { type: 'content_block_stop', index: content_index }] content_index += 1 end end stop_reason = format_stop_reason(pipeline_response) events << ['message_delta', { type: 'message_delta', delta: { stop_reason: stop_reason, stop_sequence: nil }, usage: { output_tokens: token_count(tokens, :output) } }] events << ['message_stop', { type: 'message_stop' }] events end |
.thinking_content_block(pipeline_response) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 220 def self.thinking_content_block(pipeline_response) thinking = thinking_payload(pipeline_response) return nil unless thinking if thinking[:redacted] || thinking[:type].to_s == 'redacted_thinking' data = thinking[:data] || thinking[:signature] return nil if data.to_s.empty? return { type: 'redacted_thinking', data: data.to_s } end block = { type: 'thinking', thinking: thinking[:content].to_s } block[:signature] = thinking[:signature].to_s unless thinking[:signature].to_s.empty? block end |
.thinking_payload(pipeline_response) ⇒ Object
236 237 238 239 240 241 242 243 244 245 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 236 def self.thinking_payload(pipeline_response) return nil unless pipeline_response.respond_to?(:thinking) thinking_data = begin pipeline_response.thinking rescue Exception # rubocop:disable Style/RescueException nil end normalize_thinking_payload(thinking_data) end |
.token_count(tokens, key) ⇒ Object
340 341 342 343 344 345 346 347 348 |
# File 'lib/legion/llm/api/translators/anthropic_response.rb', line 340 def self.token_count(tokens, key) return 0 if tokens.nil? return tokens[key] || tokens[key.to_s] || 0 if tokens.is_a?(Hash) method_name = { input: :input_tokens, output: :output_tokens }[key] return tokens.public_send(method_name) if method_name && tokens.respond_to?(method_name) 0 end |