Module: Legion::LLM::API::StreamAssembler::ChunkAdapter
- Defined in:
- lib/legion/llm/api/stream_assembler.rb
Class Method Summary collapse
-
.delta_text(delta) ⇒ Object
A canonical chunk delta is normally a String, but a non-incremental final message can arrive as a Canonical::ContentBlock (or array of them).
- .from_canonical(chunk) ⇒ Object
-
.from_legacy(chunk) ⇒ Object
Legacy lex-llm Responses::StreamChunk shape (.content, .thinking).
- .legacy_thinking(chunk) ⇒ Object
- .legacy_tool_calls(chunk) ⇒ Object
- .legacy_tool_calls_if_real(chunk) ⇒ Object
- .normalize(chunk) ⇒ Object
-
.safe_call(obj, method) ⇒ Object
respond_to? alone isn’t sufficient for RSpec doubles that stub respond_to? to true but don’t actually implement every getter.
- .safe_parse_args(str) ⇒ Object
- .server_tool_source?(source) ⇒ Boolean
Class Method Details
.delta_text(delta) ⇒ Object
A canonical chunk delta is normally a String, but a non-incremental final message can arrive as a Canonical::ContentBlock (or array of them). Unwrap to text — never ‘.to_s` a value object onto the wire.
659 660 661 662 663 664 665 666 667 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 659 def delta_text(delta) return '' if delta.nil? return delta if delta.is_a?(String) return delta.map { |part| delta_text(part) }.join if delta.is_a?(Array) return delta.text.to_s if delta.respond_to?(:text) && delta.text return delta.content.to_s if delta.respond_to?(:content) && delta.content '' end |
.from_canonical(chunk) ⇒ Object
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 669 def from_canonical(chunk) case chunk.type when :text_delta AdaptedChunk.new(text: delta_text(chunk.delta)) when :thinking_delta AdaptedChunk.new(thinking_text: delta_text(chunk.delta), thinking_signature: chunk.signature) when :tool_call_delta tc = chunk.tool_call return AdaptedChunk.new if tc.nil? source = tc.respond_to?(:source) ? tc.source : nil AdaptedChunk.new( tool_calls: [{ id: tc.respond_to?(:id) ? tc.id : nil, name: tc.respond_to?(:name) ? tc.name : nil, arguments: tc.respond_to?(:arguments) ? tc.arguments : {}, result: tc.respond_to?(:result) ? tc.result : nil, server_tool: server_tool_source?(source) }] ) else AdaptedChunk.new end end |
.from_legacy(chunk) ⇒ Object
Legacy lex-llm Responses::StreamChunk shape (.content, .thinking). NB: legacy chunks in Anthropic/Chat lanes are text-only — the executor tool loop accumulates tool calls into the final response, not per-chunk. The Responses lane uses canonical chunks already. Only probe tool_calls when the chunk is the concrete StreamChunk.
699 700 701 702 703 704 705 706 707 708 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 699 def from_legacy(chunk) text = chunk.respond_to?(:content) ? safe_call(chunk, :content).to_s : chunk.to_s thinking_text, thinking_signature = legacy_thinking(chunk) AdaptedChunk.new( text: text, thinking_text: thinking_text, thinking_signature: thinking_signature, tool_calls: legacy_tool_calls_if_real(chunk) ) end |
.legacy_thinking(chunk) ⇒ Object
728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 728 def legacy_thinking(chunk) return [nil, nil] unless chunk.respond_to?(:thinking) thinking = safe_call(chunk, :thinking) return [nil, nil] if thinking.nil? if thinking.is_a?(Hash) normalized = thinking.transform_keys { |k| k.respond_to?(:to_sym) ? k.to_sym : k } [normalized[:content] || normalized[:text] || normalized[:thinking], normalized[:signature]] elsif thinking.respond_to?(:content) && thinking.content [thinking.content, thinking.respond_to?(:signature) ? thinking.signature : nil] elsif thinking.respond_to?(:text) # Legacy lex-llm Thinking exposes #text/#signature (no #content) — # extract the text, never the Ruby `#<...Thinking:0x...>` inspect. [thinking.text, thinking.respond_to?(:signature) ? thinking.signature : nil] else [thinking.is_a?(String) ? thinking : nil, nil] end end |
.legacy_tool_calls(chunk) ⇒ Object
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 748 def legacy_tool_calls(chunk) return [] unless chunk.respond_to?(:tool_calls) calls = safe_call(chunk, :tool_calls) return [] if calls.nil? || (calls.respond_to?(:empty?) && calls.empty?) # Legacy yields tool_calls as { id => obj }. Canonical post-P3 yields arrays. iterable = calls.is_a?(Hash) ? calls.values : Array(calls) iterable.filter_map do |tc| name = tc.respond_to?(:name) ? tc.name.to_s : '' args = tc.respond_to?(:arguments) ? tc.arguments : {} next if name.empty? && (args.nil? || (args.respond_to?(:empty?) && args.empty?)) { id: tc.respond_to?(:id) ? tc.id : nil, name: name, arguments: args.is_a?(String) ? safe_parse_args(args) : args, server_tool: false } end end |
.legacy_tool_calls_if_real(chunk) ⇒ Object
710 711 712 713 714 715 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 710 def legacy_tool_calls_if_real(chunk) return [] unless defined?(::Legion::Extensions::Llm::Responses::StreamChunk) && chunk.is_a?(::Legion::Extensions::Llm::Responses::StreamChunk) legacy_tool_calls(chunk) end |
.normalize(chunk) ⇒ Object
647 648 649 650 651 652 653 654 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 647 def normalize(chunk) return nil if chunk.nil? return from_canonical(chunk) if defined?(::Legion::Extensions::Llm::Canonical::Chunk) && chunk.is_a?(::Legion::Extensions::Llm::Canonical::Chunk) from_legacy(chunk) end |
.safe_call(obj, method) ⇒ Object
respond_to? alone isn’t sufficient for RSpec doubles that stub respond_to? to true but don’t actually implement every getter. RSpec::Mocks::MockExpectationError descends from Exception (not StandardError), so we widen the rescue here just for the adapter entry point.
722 723 724 725 726 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 722 def safe_call(obj, method) obj.public_send(method) rescue Exception # rubocop:disable Lint/RescueException -- isolating provider chunk shape probing nil end |
.safe_parse_args(str) ⇒ Object
770 771 772 773 774 775 776 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 770 def safe_parse_args(str) return {} if str.to_s.empty? Legion::JSON.parse(str, symbolize_names: true) rescue StandardError str end |
.server_tool_source?(source) ⇒ Boolean
778 779 780 781 782 783 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 778 def server_tool_source?(source) return false if source.nil? type = source.is_a?(Hash) ? (source[:type] || source['type']) : source %i[special registry extension mcp].include?(type&.to_sym) end |