Module: Legion::Extensions::Llm::Streaming
- Extended by:
- Logging::Helper
- Includes:
- Logging::Helper
- Included in:
- Provider
- Defined in:
- lib/legion/extensions/llm/streaming.rb
Overview
Handles streaming responses from AI providers.
Defined Under Namespace
Modules: FaradayHandlers
Class Method Summary collapse
- .build_on_data_handler ⇒ Object
- .build_stream_callback(accumulator, block) ⇒ Object
- .build_stream_error_response(parsed_data, env, status) ⇒ Object
- .error_chunk?(chunk) ⇒ Boolean
- .faraday_1? ⇒ Boolean
- .handle_data(data, env) ⇒ Object
- .handle_error_chunk(chunk, env) ⇒ Object
- .handle_error_event(data, env) ⇒ Object
- .handle_failed_response(chunk, buffer, env) ⇒ Object
- .handle_json_error_chunk(chunk, env) ⇒ Object
- .handle_parsed_error(parsed_data, env) ⇒ Object
- .handle_sse(chunk, parser, env) ⇒ Object
- .handle_stream(&block) ⇒ Object
- .json_error_payload?(chunk) ⇒ Boolean
- .parse_error_from_json(data, env, _error_message) ⇒ Object
- .parse_streaming_error(data) ⇒ Object
- .persist_failed_response_body(buffer, env) ⇒ Object
- .persist_failed_response_custom_body?(buffer, env) ⇒ Boolean
- .persist_failed_response_env_body?(buffer, env) ⇒ Boolean
- .process_stream_chunk(chunk, parser, env) ⇒ Object
- .raise_partial_streaming_error(buffer, env) ⇒ Object
- .stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
Class Method Details
.build_on_data_handler ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/legion/extensions/llm/streaming.rb', line 62 def build_on_data_handler(&) buffer = +'' parser = EventStreamParser::Parser.new FaradayHandlers.build( faraday_v1: faraday_1?, on_chunk: ->(chunk, env) { process_stream_chunk(chunk, parser, env, &) }, on_failed_response: ->(chunk, env) { handle_failed_response(chunk, buffer, env) } ) end |
.build_stream_callback(accumulator, block) ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/legion/extensions/llm/streaming.rb', line 37 def build_stream_callback(accumulator, block) proc do |chunk| next unless chunk accumulator.add chunk filtered = accumulator.filtered_chunk(chunk) block.call(filtered) if filtered end end |
.build_stream_error_response(parsed_data, env, status) ⇒ Object
193 194 195 196 197 198 199 200 201 |
# File 'lib/legion/extensions/llm/streaming.rb', line 193 def build_stream_error_response(parsed_data, env, status) error_status = status || env&.status || 500 if faraday_1? || env.nil? Struct.new(:body, :status).new(parsed_data, error_status) else env.merge(body: parsed_data, status: error_status) end end |
.error_chunk?(chunk) ⇒ Boolean
85 86 87 |
# File 'lib/legion/extensions/llm/streaming.rb', line 85 def error_chunk?(chunk) chunk.start_with?('event: error') end |
.faraday_1? ⇒ Boolean
58 59 60 |
# File 'lib/legion/extensions/llm/streaming.rb', line 58 def faraday_1? Faraday::VERSION.start_with?('1') end |
.handle_data(data, env) ⇒ Object
159 160 161 162 163 164 165 166 |
# File 'lib/legion/extensions/llm/streaming.rb', line 159 def handle_data(data, env) parsed = Legion::JSON.parse(data, symbolize_names: false) return parsed unless parsed.is_a?(Hash) && parsed.key?('error') handle_parsed_error(parsed, env) rescue Legion::JSON::ParseError => e handle_exception(e, level: :warn, handled: true, operation: 'llm.streaming.handle_data') end |
.handle_error_chunk(chunk, env) ⇒ Object
97 98 99 100 |
# File 'lib/legion/extensions/llm/streaming.rb', line 97 def handle_error_chunk(chunk, env) error_data = chunk.split("\n")[1].delete_prefix('data: ') parse_error_from_json(error_data, env, 'Failed to parse error chunk') end |
.handle_error_event(data, env) ⇒ Object
168 169 170 |
# File 'lib/legion/extensions/llm/streaming.rb', line 168 def handle_error_event(data, env) parse_error_from_json(data, env, 'Failed to parse error event') end |
.handle_failed_response(chunk, buffer, env) ⇒ Object
102 103 104 105 106 107 108 109 110 111 |
# File 'lib/legion/extensions/llm/streaming.rb', line 102 def handle_failed_response(chunk, buffer, env) buffer << chunk body_persisted = persist_failed_response_body(buffer, env) error_data = Legion::JSON.parse(buffer, symbolize_names: false) handle_parsed_error(error_data, env) rescue Legion::JSON::ParseError return if body_persisted raise_partial_streaming_error(buffer, env) end |
.handle_json_error_chunk(chunk, env) ⇒ Object
93 94 95 |
# File 'lib/legion/extensions/llm/streaming.rb', line 93 def handle_json_error_chunk(chunk, env) parse_error_from_json(chunk, env, 'Failed to parse JSON error chunk') end |
.handle_parsed_error(parsed_data, env) ⇒ Object
180 181 182 183 184 |
# File 'lib/legion/extensions/llm/streaming.rb', line 180 def handle_parsed_error(parsed_data, env) status, = parse_streaming_error(parsed_data.to_json) error_response = build_stream_error_response(parsed_data, env, status) ErrorMiddleware.parse_error(provider: self, response: error_response) end |
.handle_sse(chunk, parser, env) ⇒ Object
148 149 150 151 152 153 154 155 156 157 |
# File 'lib/legion/extensions/llm/streaming.rb', line 148 def handle_sse(chunk, parser, env, &) parser.feed(chunk) do |type, data| case type.to_sym when :error handle_error_event(data, env) else yield handle_data(data, env, &) unless data == '[DONE]' end end end |
.handle_stream(&block) ⇒ Object
47 48 49 50 51 52 53 54 |
# File 'lib/legion/extensions/llm/streaming.rb', line 47 def handle_stream(&block) build_on_data_handler do |data| next unless data.is_a?(Hash) chunk = build_chunk(data) block.call(chunk) if chunk end end |
.json_error_payload?(chunk) ⇒ Boolean
89 90 91 |
# File 'lib/legion/extensions/llm/streaming.rb', line 89 def json_error_payload?(chunk) chunk.lstrip.start_with?('{') && chunk.include?('"error"') end |
.parse_error_from_json(data, env, _error_message) ⇒ Object
186 187 188 189 190 191 |
# File 'lib/legion/extensions/llm/streaming.rb', line 186 def parse_error_from_json(data, env, ) parsed_data = Legion::JSON.parse(data, symbolize_names: false) handle_parsed_error(parsed_data, env) rescue Legion::JSON::ParseError => e handle_exception(e, level: :warn, handled: true, operation: 'llm.streaming.parse_error_from_json') end |
.parse_streaming_error(data) ⇒ Object
172 173 174 175 176 177 178 |
# File 'lib/legion/extensions/llm/streaming.rb', line 172 def parse_streaming_error(data) error_data = Legion::JSON.parse(data, symbolize_names: false) [500, error_data['message'] || 'Unknown streaming error'] rescue Legion::JSON::ParseError => e handle_exception(e, level: :warn, handled: true, operation: 'llm.streaming.parse_streaming_error') [500, "Failed to parse error: #{data}"] end |
.persist_failed_response_body(buffer, env) ⇒ Object
113 114 115 116 117 |
# File 'lib/legion/extensions/llm/streaming.rb', line 113 def persist_failed_response_body(buffer, env) custom_persisted = persist_failed_response_custom_body?(buffer, env) body_persisted = persist_failed_response_env_body?(buffer, env) custom_persisted || body_persisted end |
.persist_failed_response_custom_body?(buffer, env) ⇒ Boolean
126 127 128 129 130 131 132 133 |
# File 'lib/legion/extensions/llm/streaming.rb', line 126 def persist_failed_response_custom_body?(buffer, env) return false unless env.respond_to?(:[]=) env[ErrorMiddleware::STREAM_ERROR_BODY_KEY] = buffer.dup true rescue StandardError false end |
.persist_failed_response_env_body?(buffer, env) ⇒ Boolean
119 120 121 122 123 124 |
# File 'lib/legion/extensions/llm/streaming.rb', line 119 def persist_failed_response_env_body?(buffer, env) return false unless env.respond_to?(:body=) env.body = buffer.dup true end |
.process_stream_chunk(chunk, parser, env) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/legion/extensions/llm/streaming.rb', line 73 def process_stream_chunk(chunk, parser, env, &) log.debug { "Received chunk: #{chunk}" } if Legion::Extensions::Llm.config.log_stream_debug if error_chunk?(chunk) handle_error_chunk(chunk, env) elsif json_error_payload?(chunk) handle_json_error_chunk(chunk, env) else yield handle_sse(chunk, parser, env, &) end end |
.raise_partial_streaming_error(buffer, env) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/legion/extensions/llm/streaming.rb', line 135 def raise_partial_streaming_error(buffer, env) partial = buffer[/"message"\s*:\s*"([^"]{1,200})/, 1] status = env&.status || 0 msg = if partial "Provider error (status #{status}): #{partial}" else "Provider error (status #{status}) - response body incomplete" end log.warn "[llm][streaming] action=handle_failed_response status=#{status} " \ "partial_body=#{buffer.length}b msg=#{partial.inspect}" raise Legion::Extensions::Llm::ServerError, msg end |
.stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/legion/extensions/llm/streaming.rb', line 13 def stream_response(connection, payload, additional_headers = {}, &block) accumulator = StreamAccumulator.new response = connection.post stream_url, payload do |req| req.headers = additional_headers.merge(req.headers) unless additional_headers.empty? on_chunk = build_stream_callback(accumulator, block) log.debug { "Stream callback prepared: #{on_chunk.inspect}" } if Legion::Extensions::Llm.config.log_stream_debug if faraday_1? req.[:on_data] = handle_stream(&on_chunk) else req..on_data = handle_stream(&on_chunk) end end # Release any text held by the untagged-preamble heuristic so short # responses still stream at least one delta to the caller. final_chunk = accumulator.flush_pending_chunk block&.call(final_chunk) if final_chunk = accumulator.(response) log.debug { "Stream completed: #{.content}" } end |