Module: Legion::Extensions::Llm::Streaming

Included in:
Provider
Defined in:
lib/legion/extensions/llm/streaming.rb

Overview

Handles streaming responses from AI providers.

Defined Under Namespace

Modules: FaradayHandlers

Class Method Summary collapse

Class Method Details

.build_on_data_handlerObject



48
49
50
51
52
53
54
55
56
57
# File 'lib/legion/extensions/llm/streaming.rb', line 48

def build_on_data_handler(&)
  buffer = +''
  parser = EventStreamParser::Parser.new

  FaradayHandlers.build(
    faraday_v1: faraday_1?,
    on_chunk: ->(chunk, env) { process_stream_chunk(chunk, parser, env, &) },
    on_failed_response: ->(chunk, env) { handle_failed_response(chunk, buffer, env) }
  )
end

.build_stream_callback(accumulator, block) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/legion/extensions/llm/streaming.rb', line 28

def build_stream_callback(accumulator, block)
  proc do |chunk|
    accumulator.add chunk
    filtered = accumulator.filtered_chunk(chunk)
    block.call(filtered) if filtered
  end
end

.build_stream_error_response(parsed_data, env, status) ⇒ Object



143
144
145
146
147
148
149
150
151
# File 'lib/legion/extensions/llm/streaming.rb', line 143

def build_stream_error_response(parsed_data, env, status)
  error_status = status || env&.status || 500

  if faraday_1?
    Struct.new(:body, :status).new(parsed_data, error_status)
  else
    env.merge(body: parsed_data, status: error_status)
  end
end

.error_chunk?(chunk) ⇒ Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/legion/extensions/llm/streaming.rb', line 73

def error_chunk?(chunk)
  chunk.start_with?('event: error')
end

.faraday_1?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/legion/extensions/llm/streaming.rb', line 44

def faraday_1?
  Faraday::VERSION.start_with?('1')
end

.handle_data(data, env) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/legion/extensions/llm/streaming.rb', line 109

def handle_data(data, env)
  parsed = Legion::JSON.parse(data, symbolize_names: false)
  return parsed unless parsed.is_a?(Hash) && parsed.key?('error')

  handle_parsed_error(parsed, env)
rescue Legion::JSON::ParseError => e
  Legion::Extensions::Llm.logger.debug { "Failed to parse data chunk: #{e.message}" }
end

.handle_error_chunk(chunk, env) ⇒ Object



85
86
87
88
# File 'lib/legion/extensions/llm/streaming.rb', line 85

def handle_error_chunk(chunk, env)
  error_data = chunk.split("\n")[1].delete_prefix('data: ')
  parse_error_from_json(error_data, env, 'Failed to parse error chunk')
end

.handle_error_event(data, env) ⇒ Object



118
119
120
# File 'lib/legion/extensions/llm/streaming.rb', line 118

def handle_error_event(data, env)
  parse_error_from_json(data, env, 'Failed to parse error event')
end

.handle_failed_response(chunk, buffer, env) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/legion/extensions/llm/streaming.rb', line 90

def handle_failed_response(chunk, buffer, env)
  buffer << chunk
  error_data = Legion::JSON.parse(buffer, symbolize_names: false)
  handle_parsed_error(error_data, env)
rescue Legion::JSON::ParseError
  Legion::Extensions::Llm.logger.debug { "Accumulating error chunk: #{chunk}" }
end

.handle_json_error_chunk(chunk, env) ⇒ Object



81
82
83
# File 'lib/legion/extensions/llm/streaming.rb', line 81

def handle_json_error_chunk(chunk, env)
  parse_error_from_json(chunk, env, 'Failed to parse JSON error chunk')
end

.handle_parsed_error(parsed_data, env) ⇒ Object



130
131
132
133
134
# File 'lib/legion/extensions/llm/streaming.rb', line 130

def handle_parsed_error(parsed_data, env)
  status, _message = parse_streaming_error(parsed_data.to_json)
  error_response = build_stream_error_response(parsed_data, env, status)
  ErrorMiddleware.parse_error(provider: self, response: error_response)
end

.handle_sse(chunk, parser, env) ⇒ Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/legion/extensions/llm/streaming.rb', line 98

def handle_sse(chunk, parser, env, &)
  parser.feed(chunk) do |type, data|
    case type.to_sym
    when :error
      handle_error_event(data, env)
    else
      yield handle_data(data, env, &) unless data == '[DONE]'
    end
  end
end

.handle_stream(&block) ⇒ Object



36
37
38
39
40
# File 'lib/legion/extensions/llm/streaming.rb', line 36

def handle_stream(&block)
  build_on_data_handler do |data|
    block.call(build_chunk(data)) if data.is_a?(Hash)
  end
end

.json_error_payload?(chunk) ⇒ Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/legion/extensions/llm/streaming.rb', line 77

def json_error_payload?(chunk)
  chunk.lstrip.start_with?('{') && chunk.include?('"error"')
end

.parse_error_from_json(data, env, error_message) ⇒ Object



136
137
138
139
140
141
# File 'lib/legion/extensions/llm/streaming.rb', line 136

def parse_error_from_json(data, env, error_message)
  parsed_data = Legion::JSON.parse(data, symbolize_names: false)
  handle_parsed_error(parsed_data, env)
rescue Legion::JSON::ParseError => e
  Legion::Extensions::Llm.logger.debug { "#{error_message}: #{e.message}" }
end

.parse_streaming_error(data) ⇒ Object



122
123
124
125
126
127
128
# File 'lib/legion/extensions/llm/streaming.rb', line 122

def parse_streaming_error(data)
  error_data = Legion::JSON.parse(data, symbolize_names: false)
  [500, error_data['message'] || 'Unknown streaming error']
rescue Legion::JSON::ParseError => e
  Legion::Extensions::Llm.logger.debug { "Failed to parse streaming error: #{e.message}" }
  [500, "Failed to parse error: #{data}"]
end

.process_stream_chunk(chunk, parser, env) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/legion/extensions/llm/streaming.rb', line 59

def process_stream_chunk(chunk, parser, env, &)
  if Legion::Extensions::Llm.config.log_stream_debug
    Legion::Extensions::Llm.logger.debug { "Received chunk: #{chunk}" }
  end

  if error_chunk?(chunk)
    handle_error_chunk(chunk, env)
  elsif json_error_payload?(chunk)
    handle_json_error_chunk(chunk, env)
  else
    yield handle_sse(chunk, parser, env, &)
  end
end

.stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/legion/extensions/llm/streaming.rb', line 10

def stream_response(connection, payload, additional_headers = {}, &block)
  accumulator = StreamAccumulator.new

  response = connection.post stream_url, payload do |req|
    req.headers = additional_headers.merge(req.headers) unless additional_headers.empty?
    on_chunk = build_stream_callback(accumulator, block)
    if faraday_1?
      req.options[:on_data] = handle_stream(&on_chunk)
    else
      req.options.on_data = handle_stream(&on_chunk)
    end
  end

  message = accumulator.to_message(response)
  Legion::Extensions::Llm.logger.debug { "Stream completed: #{message.content}" }
  message
end