Module: Legion::Extensions::Llm::Streaming
- Extended by:
- Logging::Helper
- Includes:
- Logging::Helper
- Included in:
- Provider
- Defined in:
- lib/legion/extensions/llm/streaming.rb
Overview
Handles streaming responses from AI providers.
Defined Under Namespace
Modules: FaradayHandlers
Class Method Summary collapse
- .build_on_data_handler ⇒ Object
- .build_stream_callback(accumulator, block) ⇒ Object
- .build_stream_error_response(parsed_data, env, status) ⇒ Object
- .error_chunk?(chunk) ⇒ Boolean
- .faraday_1? ⇒ Boolean
- .handle_data(data, env) ⇒ Object
- .handle_error_chunk(chunk, env) ⇒ Object
- .handle_error_event(data, env) ⇒ Object
- .handle_failed_response(chunk, buffer, env) ⇒ Object
- .handle_json_error_chunk(chunk, env) ⇒ Object
- .handle_parsed_error(parsed_data, env) ⇒ Object
- .handle_sse(chunk, parser, env) ⇒ Object
- .handle_stream(&block) ⇒ Object
- .json_error_payload?(chunk) ⇒ Boolean
- .parse_error_from_json(data, env, _error_message) ⇒ Object
- .parse_streaming_error(data) ⇒ Object
- .persist_failed_response_body(buffer, env) ⇒ Object
- .persist_failed_response_custom_body?(buffer, env) ⇒ Boolean
- .persist_failed_response_env_body?(buffer, env) ⇒ Boolean
- .process_stream_chunk(chunk, parser, env) ⇒ Object
- .raise_partial_streaming_error(buffer, env) ⇒ Object
- .stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
Class Method Details
.build_on_data_handler ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/legion/extensions/llm/streaming.rb', line 54 def build_on_data_handler(&) buffer = +'' parser = EventStreamParser::Parser.new FaradayHandlers.build( faraday_v1: faraday_1?, on_chunk: ->(chunk, env) { process_stream_chunk(chunk, parser, env, &) }, on_failed_response: ->(chunk, env) { handle_failed_response(chunk, buffer, env) } ) end |
.build_stream_callback(accumulator, block) ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/legion/extensions/llm/streaming.rb', line 34 def build_stream_callback(accumulator, block) proc do |chunk| accumulator.add chunk filtered = accumulator.filtered_chunk(chunk) block.call(filtered) if filtered end end |
.build_stream_error_response(parsed_data, env, status) ⇒ Object
185 186 187 188 189 190 191 192 193 |
# File 'lib/legion/extensions/llm/streaming.rb', line 185 def build_stream_error_response(parsed_data, env, status) error_status = status || env&.status || 500 if faraday_1? Struct.new(:body, :status).new(parsed_data, error_status) else env.merge(body: parsed_data, status: error_status) end end |
.error_chunk?(chunk) ⇒ Boolean
77 78 79 |
# File 'lib/legion/extensions/llm/streaming.rb', line 77 def error_chunk?(chunk) chunk.start_with?('event: error') end |
.faraday_1? ⇒ Boolean
50 51 52 |
# File 'lib/legion/extensions/llm/streaming.rb', line 50 def faraday_1? Faraday::VERSION.start_with?('1') end |
.handle_data(data, env) ⇒ Object
151 152 153 154 155 156 157 158 |
# File 'lib/legion/extensions/llm/streaming.rb', line 151 def handle_data(data, env) parsed = Legion::JSON.parse(data, symbolize_names: false) return parsed unless parsed.is_a?(Hash) && parsed.key?('error') handle_parsed_error(parsed, env) rescue Legion::JSON::ParseError => e handle_exception(e, level: :warn, handled: true, operation: 'llm.streaming.handle_data') end |
.handle_error_chunk(chunk, env) ⇒ Object
89 90 91 92 |
# File 'lib/legion/extensions/llm/streaming.rb', line 89 def handle_error_chunk(chunk, env) error_data = chunk.split("\n")[1].delete_prefix('data: ') parse_error_from_json(error_data, env, 'Failed to parse error chunk') end |
.handle_error_event(data, env) ⇒ Object
160 161 162 |
# File 'lib/legion/extensions/llm/streaming.rb', line 160 def handle_error_event(data, env) parse_error_from_json(data, env, 'Failed to parse error event') end |
.handle_failed_response(chunk, buffer, env) ⇒ Object
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/legion/extensions/llm/streaming.rb', line 94 def handle_failed_response(chunk, buffer, env) buffer << chunk body_persisted = persist_failed_response_body(buffer, env) error_data = Legion::JSON.parse(buffer, symbolize_names: false) handle_parsed_error(error_data, env) rescue Legion::JSON::ParseError return if body_persisted raise_partial_streaming_error(buffer, env) end |
.handle_json_error_chunk(chunk, env) ⇒ Object
85 86 87 |
# File 'lib/legion/extensions/llm/streaming.rb', line 85 def handle_json_error_chunk(chunk, env) parse_error_from_json(chunk, env, 'Failed to parse JSON error chunk') end |
.handle_parsed_error(parsed_data, env) ⇒ Object
172 173 174 175 176 |
# File 'lib/legion/extensions/llm/streaming.rb', line 172 def handle_parsed_error(parsed_data, env) status, = parse_streaming_error(parsed_data.to_json) error_response = build_stream_error_response(parsed_data, env, status) ErrorMiddleware.parse_error(provider: self, response: error_response) end |
.handle_sse(chunk, parser, env) ⇒ Object
140 141 142 143 144 145 146 147 148 149 |
# File 'lib/legion/extensions/llm/streaming.rb', line 140 def handle_sse(chunk, parser, env, &) parser.feed(chunk) do |type, data| case type.to_sym when :error handle_error_event(data, env) else yield handle_data(data, env, &) unless data == '[DONE]' end end end |
.handle_stream(&block) ⇒ Object
42 43 44 45 46 |
# File 'lib/legion/extensions/llm/streaming.rb', line 42 def handle_stream(&block) build_on_data_handler do |data| block.call(build_chunk(data)) if data.is_a?(Hash) end end |
.json_error_payload?(chunk) ⇒ Boolean
81 82 83 |
# File 'lib/legion/extensions/llm/streaming.rb', line 81 def json_error_payload?(chunk) chunk.lstrip.start_with?('{') && chunk.include?('"error"') end |
.parse_error_from_json(data, env, _error_message) ⇒ Object
178 179 180 181 182 183 |
# File 'lib/legion/extensions/llm/streaming.rb', line 178 def parse_error_from_json(data, env, ) parsed_data = Legion::JSON.parse(data, symbolize_names: false) handle_parsed_error(parsed_data, env) rescue Legion::JSON::ParseError => e handle_exception(e, level: :warn, handled: true, operation: 'llm.streaming.parse_error_from_json') end |
.parse_streaming_error(data) ⇒ Object
164 165 166 167 168 169 170 |
# File 'lib/legion/extensions/llm/streaming.rb', line 164 def parse_streaming_error(data) error_data = Legion::JSON.parse(data, symbolize_names: false) [500, error_data['message'] || 'Unknown streaming error'] rescue Legion::JSON::ParseError => e handle_exception(e, level: :warn, handled: true, operation: 'llm.streaming.parse_streaming_error') [500, "Failed to parse error: #{data}"] end |
.persist_failed_response_body(buffer, env) ⇒ Object
105 106 107 108 109 |
# File 'lib/legion/extensions/llm/streaming.rb', line 105 def persist_failed_response_body(buffer, env) custom_persisted = persist_failed_response_custom_body?(buffer, env) body_persisted = persist_failed_response_env_body?(buffer, env) custom_persisted || body_persisted end |
.persist_failed_response_custom_body?(buffer, env) ⇒ Boolean
118 119 120 121 122 123 124 125 |
# File 'lib/legion/extensions/llm/streaming.rb', line 118 def persist_failed_response_custom_body?(buffer, env) return false unless env.respond_to?(:[]=) env[ErrorMiddleware::STREAM_ERROR_BODY_KEY] = buffer.dup true rescue StandardError false end |
.persist_failed_response_env_body?(buffer, env) ⇒ Boolean
111 112 113 114 115 116 |
# File 'lib/legion/extensions/llm/streaming.rb', line 111 def persist_failed_response_env_body?(buffer, env) return false unless env.respond_to?(:body=) env.body = buffer.dup true end |
.process_stream_chunk(chunk, parser, env) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/legion/extensions/llm/streaming.rb', line 65 def process_stream_chunk(chunk, parser, env, &) log.debug { "Received chunk: #{chunk}" } if Legion::Extensions::Llm.config.log_stream_debug if error_chunk?(chunk) handle_error_chunk(chunk, env) elsif json_error_payload?(chunk) handle_json_error_chunk(chunk, env) else yield handle_sse(chunk, parser, env, &) end end |
.raise_partial_streaming_error(buffer, env) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/legion/extensions/llm/streaming.rb', line 127 def raise_partial_streaming_error(buffer, env) partial = buffer[/"message"\s*:\s*"([^"]{1,200})/, 1] status = env&.status || 0 msg = if partial "Provider error (status #{status}): #{partial}" else "Provider error (status #{status}) - response body incomplete" end log.warn "[llm][streaming] action=handle_failed_response status=#{status} " \ "partial_body=#{buffer.length}b msg=#{partial.inspect}" raise Legion::Extensions::Llm::ServerError, msg end |
.stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/legion/extensions/llm/streaming.rb', line 13 def stream_response(connection, payload, additional_headers = {}, &block) accumulator = StreamAccumulator.new response = connection.post stream_url, payload do |req| req.headers = additional_headers.merge(req.headers) unless additional_headers.empty? on_chunk = build_stream_callback(accumulator, block) if Legion::Extensions::Llm.config.log_stream_debug log.debug { "Stream callback prepared: #{on_chunk.inspect}" } end if faraday_1? req.[:on_data] = handle_stream(&on_chunk) else req..on_data = handle_stream(&on_chunk) end end = accumulator.(response) log.debug { "Stream completed: #{.content}" } end |