Module: Legion::LLM::API::StreamAssembler::ChunkAdapter

Defined in:
lib/legion/llm/api/stream_assembler.rb

Class Method Summary collapse

Class Method Details

.delta_text(delta) ⇒ Object

A canonical chunk delta is normally a String, but a non-incremental final message can arrive as a Canonical::ContentBlock (or array of them). Unwrap to text — never ‘.to_s` a value object onto the wire.



659
660
661
662
663
664
665
666
667
# File 'lib/legion/llm/api/stream_assembler.rb', line 659

def delta_text(delta)
  return '' if delta.nil?
  return delta if delta.is_a?(String)
  return delta.map { |part| delta_text(part) }.join if delta.is_a?(Array)
  return delta.text.to_s if delta.respond_to?(:text) && delta.text
  return delta.content.to_s if delta.respond_to?(:content) && delta.content

  ''
end

.from_canonical(chunk) ⇒ Object



669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
# File 'lib/legion/llm/api/stream_assembler.rb', line 669

def from_canonical(chunk)
  case chunk.type
  when :text_delta
    AdaptedChunk.new(text: delta_text(chunk.delta))
  when :thinking_delta
    AdaptedChunk.new(thinking_text: delta_text(chunk.delta), thinking_signature: chunk.signature)
  when :tool_call_delta
    tc = chunk.tool_call
    return AdaptedChunk.new if tc.nil?

    source = tc.respond_to?(:source) ? tc.source : nil
    AdaptedChunk.new(
      tool_calls: [{
        id:          tc.respond_to?(:id) ? tc.id : nil,
        name:        tc.respond_to?(:name) ? tc.name : nil,
        arguments:   tc.respond_to?(:arguments) ? tc.arguments : {},
        result:      tc.respond_to?(:result) ? tc.result : nil,
        server_tool: server_tool_source?(source)
      }]
    )
  else
    AdaptedChunk.new
  end
end

.from_legacy(chunk) ⇒ Object

Legacy lex-llm Responses::StreamChunk shape (.content, .thinking). NB: legacy chunks in Anthropic/Chat lanes are text-only — the executor tool loop accumulates tool calls into the final response, not per-chunk. The Responses lane uses canonical chunks already. Only probe tool_calls when the chunk is the concrete StreamChunk.



699
700
701
702
703
704
705
706
707
708
# File 'lib/legion/llm/api/stream_assembler.rb', line 699

def from_legacy(chunk)
  text = chunk.respond_to?(:content) ? safe_call(chunk, :content).to_s : chunk.to_s
  thinking_text, thinking_signature = legacy_thinking(chunk)
  AdaptedChunk.new(
    text:               text,
    thinking_text:      thinking_text,
    thinking_signature: thinking_signature,
    tool_calls:         legacy_tool_calls_if_real(chunk)
  )
end

.legacy_thinking(chunk) ⇒ Object



728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
# File 'lib/legion/llm/api/stream_assembler.rb', line 728

def legacy_thinking(chunk)
  return [nil, nil] unless chunk.respond_to?(:thinking)

  thinking = safe_call(chunk, :thinking)
  return [nil, nil] if thinking.nil?

  if thinking.is_a?(Hash)
    normalized = thinking.transform_keys { |k| k.respond_to?(:to_sym) ? k.to_sym : k }
    [normalized[:content] || normalized[:text] || normalized[:thinking], normalized[:signature]]
  elsif thinking.respond_to?(:content) && thinking.content
    [thinking.content, thinking.respond_to?(:signature) ? thinking.signature : nil]
  elsif thinking.respond_to?(:text)
    # Legacy lex-llm Thinking exposes #text/#signature (no #content) —
    # extract the text, never the Ruby `#<...Thinking:0x...>` inspect.
    [thinking.text, thinking.respond_to?(:signature) ? thinking.signature : nil]
  else
    [thinking.is_a?(String) ? thinking : nil, nil]
  end
end

.legacy_tool_calls(chunk) ⇒ Object



748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
# File 'lib/legion/llm/api/stream_assembler.rb', line 748

def legacy_tool_calls(chunk)
  return [] unless chunk.respond_to?(:tool_calls)

  calls = safe_call(chunk, :tool_calls)
  return [] if calls.nil? || (calls.respond_to?(:empty?) && calls.empty?)

  # Legacy yields tool_calls as { id => obj }. Canonical post-P3 yields arrays.
  iterable = calls.is_a?(Hash) ? calls.values : Array(calls)
  iterable.filter_map do |tc|
    name = tc.respond_to?(:name) ? tc.name.to_s : ''
    args = tc.respond_to?(:arguments) ? tc.arguments : {}
    next if name.empty? && (args.nil? || (args.respond_to?(:empty?) && args.empty?))

    {
      id:          tc.respond_to?(:id) ? tc.id : nil,
      name:        name,
      arguments:   args.is_a?(String) ? safe_parse_args(args) : args,
      server_tool: false
    }
  end
end

.legacy_tool_calls_if_real(chunk) ⇒ Object



710
711
712
713
714
715
# File 'lib/legion/llm/api/stream_assembler.rb', line 710

def legacy_tool_calls_if_real(chunk)
  return [] unless defined?(::Legion::Extensions::Llm::Responses::StreamChunk) &&
                   chunk.is_a?(::Legion::Extensions::Llm::Responses::StreamChunk)

  legacy_tool_calls(chunk)
end

.normalize(chunk) ⇒ Object



647
648
649
650
651
652
653
654
# File 'lib/legion/llm/api/stream_assembler.rb', line 647

def normalize(chunk)
  return nil if chunk.nil?

  return from_canonical(chunk) if defined?(::Legion::Extensions::Llm::Canonical::Chunk) &&
                                  chunk.is_a?(::Legion::Extensions::Llm::Canonical::Chunk)

  from_legacy(chunk)
end

.safe_call(obj, method) ⇒ Object

respond_to? alone isn’t sufficient for RSpec doubles that stub respond_to? to true but don’t actually implement every getter. RSpec::Mocks::MockExpectationError descends from Exception (not StandardError), so we widen the rescue here just for the adapter entry point.



722
723
724
725
726
# File 'lib/legion/llm/api/stream_assembler.rb', line 722

def safe_call(obj, method)
  obj.public_send(method)
rescue Exception # rubocop:disable Lint/RescueException -- isolating provider chunk shape probing
  nil
end

.safe_parse_args(str) ⇒ Object



770
771
772
773
774
775
776
# File 'lib/legion/llm/api/stream_assembler.rb', line 770

def safe_parse_args(str)
  return {} if str.to_s.empty?

  Legion::JSON.parse(str, symbolize_names: true)
rescue StandardError
  str
end

.server_tool_source?(source) ⇒ Boolean

Returns:

  • (Boolean)


778
779
780
781
782
783
# File 'lib/legion/llm/api/stream_assembler.rb', line 778

def server_tool_source?(source)
  return false if source.nil?

  type = source.is_a?(Hash) ? (source[:type] || source['type']) : source
  %i[special registry extension mcp].include?(type&.to_sym)
end