Class: Legion::Extensions::Llm::StreamAccumulator

Inherits:
Object
  • Object
show all
Defined in:
lib/legion/extensions/llm/stream_accumulator.rb

Overview

Assembles streaming responses from LLMs into complete messages.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStreamAccumulator

Returns a new instance of StreamAccumulator.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/legion/extensions/llm/stream_accumulator.rb', line 10

def initialize
  @content = +''
  @thinking_text = +''
  @thinking_signature = nil
  @tool_calls = {}
  @input_tokens = nil
  @output_tokens = nil
  @cached_tokens = nil
  @cache_creation_tokens = nil
  @thinking_tokens = nil
  @inside_think_tag = false
  @pending_think_tag = +''
  @latest_tool_call_id = nil
end

Instance Attribute Details

#contentObject (readonly)

Returns the value of attribute content.



8
9
10
# File 'lib/legion/extensions/llm/stream_accumulator.rb', line 8

def content
  @content
end

#model_idObject (readonly)

Returns the value of attribute model_id.



8
9
10
# File 'lib/legion/extensions/llm/stream_accumulator.rb', line 8

def model_id
  @model_id
end

#tool_callsObject (readonly)

Returns the value of attribute tool_calls.



8
9
10
# File 'lib/legion/extensions/llm/stream_accumulator.rb', line 8

def tool_calls
  @tool_calls
end

Instance Method Details

#add(chunk) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/legion/extensions/llm/stream_accumulator.rb', line 25

def add(chunk)
  Legion::Extensions::Llm.logger.debug { chunk.inspect } if Legion::Extensions::Llm.config.log_stream_debug
  @model_id ||= chunk.model_id

  @last_content_delta = +''
  @last_thinking_delta = +''
  handle_chunk_content(chunk)
  append_thinking_from_chunk(chunk)
  count_tokens chunk
  Legion::Extensions::Llm.logger.debug { inspect } if Legion::Extensions::Llm.config.log_stream_debug
end

#filtered_chunk(chunk) ⇒ Object

rubocop:disable Metrics/PerceivedComplexity



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/legion/extensions/llm/stream_accumulator.rb', line 37

def filtered_chunk(chunk) # rubocop:disable Metrics/PerceivedComplexity
  has_content = !@last_content_delta.empty?
  has_thinking = !@last_thinking_delta.empty?
  has_tokens = chunk.input_tokens&.positive? || chunk.output_tokens&.positive?
  return nil unless has_content || has_thinking || chunk.tool_call? || has_tokens

  Chunk.new(
    role: :assistant,
    content: has_content ? @last_content_delta : nil,
    thinking: has_thinking ? Thinking.build(text: @last_thinking_delta) : chunk.thinking,
    model_id: chunk.model_id,
    tool_calls: chunk.tool_calls,
    input_tokens: chunk.input_tokens,
    output_tokens: chunk.output_tokens,
    raw: chunk.raw
  )
end

#to_message(response) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/legion/extensions/llm/stream_accumulator.rb', line 55

def to_message(response)
  Message.new(
    role: :assistant,
    content: content.empty? ? nil : content,
    thinking: Thinking.build(
      text: @thinking_text.empty? ? nil : @thinking_text,
      signature: @thinking_signature
    ),
    tokens: Tokens.build(
      input: @input_tokens,
      output: @output_tokens,
      cached: @cached_tokens,
      cache_creation: @cache_creation_tokens,
      thinking: @thinking_tokens
    ),
    model_id: model_id,
    tool_calls: tool_calls_from_stream,
    raw: response
  )
end