Module: Legion::Extensions::Llm::Streaming
- Included in:
- Provider
- Defined in:
- lib/legion/extensions/llm/streaming.rb
Overview
Handles streaming responses from AI providers.
Defined Under Namespace
Modules: FaradayHandlers
Class Method Summary collapse
- .build_on_data_handler ⇒ Object
- .build_stream_callback(accumulator, block) ⇒ Object
- .build_stream_error_response(parsed_data, env, status) ⇒ Object
- .error_chunk?(chunk) ⇒ Boolean
- .faraday_1? ⇒ Boolean
- .handle_data(data, env) ⇒ Object
- .handle_error_chunk(chunk, env) ⇒ Object
- .handle_error_event(data, env) ⇒ Object
- .handle_failed_response(chunk, buffer, env) ⇒ Object
- .handle_json_error_chunk(chunk, env) ⇒ Object
- .handle_parsed_error(parsed_data, env) ⇒ Object
- .handle_sse(chunk, parser, env) ⇒ Object
- .handle_stream(&block) ⇒ Object
- .json_error_payload?(chunk) ⇒ Boolean
- .parse_error_from_json(data, env, error_message) ⇒ Object
- .parse_streaming_error(data) ⇒ Object
- .process_stream_chunk(chunk, parser, env) ⇒ Object
- .stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
Class Method Details
.build_on_data_handler ⇒ Object
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/legion/extensions/llm/streaming.rb', line 48 def build_on_data_handler(&) buffer = +'' parser = EventStreamParser::Parser.new FaradayHandlers.build( faraday_v1: faraday_1?, on_chunk: ->(chunk, env) { process_stream_chunk(chunk, parser, env, &) }, on_failed_response: ->(chunk, env) { handle_failed_response(chunk, buffer, env) } ) end |
.build_stream_callback(accumulator, block) ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/legion/extensions/llm/streaming.rb', line 28 def build_stream_callback(accumulator, block) proc do |chunk| accumulator.add chunk filtered = accumulator.filtered_chunk(chunk) block.call(filtered) if filtered end end |
.build_stream_error_response(parsed_data, env, status) ⇒ Object
143 144 145 146 147 148 149 150 151 |
# File 'lib/legion/extensions/llm/streaming.rb', line 143 def build_stream_error_response(parsed_data, env, status) error_status = status || env&.status || 500 if faraday_1? Struct.new(:body, :status).new(parsed_data, error_status) else env.merge(body: parsed_data, status: error_status) end end |
.error_chunk?(chunk) ⇒ Boolean
73 74 75 |
# File 'lib/legion/extensions/llm/streaming.rb', line 73 def error_chunk?(chunk) chunk.start_with?('event: error') end |
.faraday_1? ⇒ Boolean
44 45 46 |
# File 'lib/legion/extensions/llm/streaming.rb', line 44 def faraday_1? Faraday::VERSION.start_with?('1') end |
.handle_data(data, env) ⇒ Object
109 110 111 112 113 114 115 116 |
# File 'lib/legion/extensions/llm/streaming.rb', line 109 def handle_data(data, env) parsed = Legion::JSON.parse(data, symbolize_names: false) return parsed unless parsed.is_a?(Hash) && parsed.key?('error') handle_parsed_error(parsed, env) rescue Legion::JSON::ParseError => e Legion::Extensions::Llm.logger.debug { "Failed to parse data chunk: #{e.}" } end |
.handle_error_chunk(chunk, env) ⇒ Object
85 86 87 88 |
# File 'lib/legion/extensions/llm/streaming.rb', line 85 def handle_error_chunk(chunk, env) error_data = chunk.split("\n")[1].delete_prefix('data: ') parse_error_from_json(error_data, env, 'Failed to parse error chunk') end |
.handle_error_event(data, env) ⇒ Object
118 119 120 |
# File 'lib/legion/extensions/llm/streaming.rb', line 118 def handle_error_event(data, env) parse_error_from_json(data, env, 'Failed to parse error event') end |
.handle_failed_response(chunk, buffer, env) ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/legion/extensions/llm/streaming.rb', line 90 def handle_failed_response(chunk, buffer, env) buffer << chunk error_data = Legion::JSON.parse(buffer, symbolize_names: false) handle_parsed_error(error_data, env) rescue Legion::JSON::ParseError Legion::Extensions::Llm.logger.debug { "Accumulating error chunk: #{chunk}" } end |
.handle_json_error_chunk(chunk, env) ⇒ Object
81 82 83 |
# File 'lib/legion/extensions/llm/streaming.rb', line 81 def handle_json_error_chunk(chunk, env) parse_error_from_json(chunk, env, 'Failed to parse JSON error chunk') end |
.handle_parsed_error(parsed_data, env) ⇒ Object
130 131 132 133 134 |
# File 'lib/legion/extensions/llm/streaming.rb', line 130 def handle_parsed_error(parsed_data, env) status, = parse_streaming_error(parsed_data.to_json) error_response = build_stream_error_response(parsed_data, env, status) ErrorMiddleware.parse_error(provider: self, response: error_response) end |
.handle_sse(chunk, parser, env) ⇒ Object
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/legion/extensions/llm/streaming.rb', line 98 def handle_sse(chunk, parser, env, &) parser.feed(chunk) do |type, data| case type.to_sym when :error handle_error_event(data, env) else yield handle_data(data, env, &) unless data == '[DONE]' end end end |
.handle_stream(&block) ⇒ Object
36 37 38 39 40 |
# File 'lib/legion/extensions/llm/streaming.rb', line 36 def handle_stream(&block) build_on_data_handler do |data| block.call(build_chunk(data)) if data.is_a?(Hash) end end |
.json_error_payload?(chunk) ⇒ Boolean
77 78 79 |
# File 'lib/legion/extensions/llm/streaming.rb', line 77 def json_error_payload?(chunk) chunk.lstrip.start_with?('{') && chunk.include?('"error"') end |
.parse_error_from_json(data, env, error_message) ⇒ Object
136 137 138 139 140 141 |
# File 'lib/legion/extensions/llm/streaming.rb', line 136 def parse_error_from_json(data, env, ) parsed_data = Legion::JSON.parse(data, symbolize_names: false) handle_parsed_error(parsed_data, env) rescue Legion::JSON::ParseError => e Legion::Extensions::Llm.logger.debug { "#{}: #{e.}" } end |
.parse_streaming_error(data) ⇒ Object
122 123 124 125 126 127 128 |
# File 'lib/legion/extensions/llm/streaming.rb', line 122 def parse_streaming_error(data) error_data = Legion::JSON.parse(data, symbolize_names: false) [500, error_data['message'] || 'Unknown streaming error'] rescue Legion::JSON::ParseError => e Legion::Extensions::Llm.logger.debug { "Failed to parse streaming error: #{e.}" } [500, "Failed to parse error: #{data}"] end |
.process_stream_chunk(chunk, parser, env) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/legion/extensions/llm/streaming.rb', line 59 def process_stream_chunk(chunk, parser, env, &) if Legion::Extensions::Llm.config.log_stream_debug Legion::Extensions::Llm.logger.debug { "Received chunk: #{chunk}" } end if error_chunk?(chunk) handle_error_chunk(chunk, env) elsif json_error_payload?(chunk) handle_json_error_chunk(chunk, env) else yield handle_sse(chunk, parser, env, &) end end |
.stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/legion/extensions/llm/streaming.rb', line 10 def stream_response(connection, payload, additional_headers = {}, &block) accumulator = StreamAccumulator.new response = connection.post stream_url, payload do |req| req.headers = additional_headers.merge(req.headers) unless additional_headers.empty? on_chunk = build_stream_callback(accumulator, block) if faraday_1? req.[:on_data] = handle_stream(&on_chunk) else req..on_data = handle_stream(&on_chunk) end end = accumulator.(response) Legion::Extensions::Llm.logger.debug { "Stream completed: #{.content}" } end |