Class: Legion::LLM::API::StreamAssembler
- Inherits:
-
Object
- Object
- Legion::LLM::API::StreamAssembler
- Extended by:
- Legion::Logging::Helper
- Includes:
- ClientTranslators::SharedExtractors, Legion::Logging::Helper
- Defined in:
- lib/legion/llm/api/stream_assembler.rb
Overview
Single SSE state machine for client-format streaming.
Per design 2026-06-09 R5/G6:
- block index bookkeeping (one place, not per-route)
- thinking-before-text ordering
- fallback text emission (when provider returned no chunks)
- server tool result emission (LegionIO tools execute server-side and
emit their result inline)
- push raises StreamClosed after client disconnect — caller treats as
cancellation per G10
- finalize no-ops when closed
- provider-error mid-stream emits client-format error event after
failover ladder is exhausted (G6)
- failover phase points: before-first-byte / mid-text / mid-thinking /
mid-tool-call (the assembler tracks the current phase so the
escalation chain knows what to replay)
Chunk shape: the assembler accepts canonical chunks (Legion::Extensions::Llm::Canonical::Chunk, post P3 translators) and the legacy provider StreamChunk (lex-llm Responses::StreamChunk). The ChunkAdapter normalizes both to a tiny internal struct so the state machine has one shape to reason about.
Defined Under Namespace
Modules: ChunkAdapter Classes: StreamClosed
Constant Summary collapse
- PHASES =
Phases per G6 — failover ladder branches on this.
%i[ before_first_byte mid_text mid_thinking mid_tool_call finished ].freeze
Instance Attribute Summary collapse
-
#failover_events ⇒ Object
readonly
Returns the value of attribute failover_events.
-
#phase ⇒ Object
readonly
Emitter contract — every client format implements: on_start(model:, request_id:, input_tokens:) on_text_open(block_index:) — content_block_start text on_text_delta(block_index:, text:) on_text_close(block_index:) on_thinking_open(block_index:) on_thinking_delta(block_index:, text:, signature:) on_thinking_close(block_index:, signature:) on_tool_call_open(block_index:, tool_call:, server_tool:) on_tool_call_delta(block_index:, partial_arguments_json:) on_tool_call_close(block_index:) on_server_tool_result(block_index:, tool_call_id:, result_text:) on_keep_alive on_message_delta(stop_reason:, output_tokens:) on_done(stop_reason:, usage:, model:) on_error(message:, type:, status_code:).
Instance Method Summary collapse
- #closed? ⇒ Boolean
-
#finalize(final_response) ⇒ Object
Close the SSE stream.
-
#initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil) ⇒ StreamAssembler
constructor
A new instance of StreamAssembler.
-
#keep_alive! ⇒ Object
Emitters call this to send a keep-alive ping while a buffered tool call is being assembled (so large tool args don’t make the provider look dead, per G6c).
-
#on_failover_exhausted(error, status: 502, type: 'overloaded_error') ⇒ Object
Handle a provider-error after the failover ladder is exhausted (G6).
- #provider_failed(error:, resolution:) ⇒ Object
-
#provider_switched(from:, to:) ⇒ Object
rubocop:disable Lint/UnusedMethodArgument.
-
#push(chunk) ⇒ Object
Consume one chunk.
- #safe_replay_snapshot ⇒ Object
-
#start! ⇒ Object
Begin the SSE response.
Methods included from ClientTranslators::SharedExtractors
#args_as_json_string, #args_as_object, #extract_content_text, #extract_thinking_text, #legion_routing_explicit_from_env, #legion_routing_from_env, #text_content_type?
Constructor Details
#initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil) ⇒ StreamAssembler
Returns a new instance of StreamAssembler.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 69 def initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil) @emitter = emitter @request_id = request_id @model = model.to_s @input_tokens = input_tokens.to_i settings = Legion::Settings[:llm][:streaming] || {} @emit_thinking_blocks = if emit_thinking_blocks.nil? settings[:emit_thinking_blocks] == true else emit_thinking_blocks end @tool_call_buffering = (tool_call_buffering || settings[:tool_call_buffering] || :buffered).to_sym @keep_alive_interval_ms = (keep_alive_interval_ms || settings[:keep_alive_interval_ms] || 5_000).to_i @phase = :before_first_byte @closed = false @started = false @next_block_index = 0 @text_block_index = nil @thinking_block_index = nil @text_block_open = false @thinking_block_open = false @thinking_block_closed = false @full_text = +'' @full_thinking = +'' @full_thinking_signature = nil @open_tool_calls = {} # tool_call_id => { block_index:, name:, server_tool:, last_args_str: } @last_keep_alive_at = nil end |
Instance Attribute Details
#failover_events ⇒ Object (readonly)
Returns the value of attribute failover_events.
191 192 193 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 191 def failover_events @failover_events end |
#phase ⇒ Object (readonly)
Emitter contract — every client format implements:
on_start(model:, request_id:, input_tokens:)
on_text_open(block_index:) — content_block_start text
on_text_delta(block_index:, text:)
on_text_close(block_index:)
on_thinking_open(block_index:)
on_thinking_delta(block_index:, text:, signature:)
on_thinking_close(block_index:, signature:)
on_tool_call_open(block_index:, tool_call:, server_tool:)
on_tool_call_delta(block_index:, partial_arguments_json:)
on_tool_call_close(block_index:)
on_server_tool_result(block_index:, tool_call_id:, result_text:)
on_keep_alive
on_message_delta(stop_reason:, output_tokens:)
on_done(stop_reason:, usage:, model:)
on_error(message:, type:, status_code:)
Emitters write to the response stream directly; they don’t return values. The assembler catches IOError/EPIPE and flips into closed state.
58 59 60 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 58 def phase @phase end |
Instance Method Details
#closed? ⇒ Boolean
102 103 104 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 102 def closed? @closed end |
#finalize(final_response) ⇒ Object
Close the SSE stream. No-ops on a closed stream (per R5). final_response is the executor’s pipeline response (carries tool_calls, thinking, stop_reason, tokens for the trailer events).
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 150 def finalize(final_response) return if @closed start! unless @started handle_final_thinking(final_response) if @emit_thinking_blocks close_thinking_block tool_calls = extract_tool_calls(final_response) # Emit a fallback text block when nothing was streamed and there are # no tool calls — protects against providers that batched everything # into a non-streaming-shaped final response. emit_fallback_text(final_response) if !@text_block_open && @full_text.empty? && tool_calls.empty? close_text_block tool_calls.each { |tc| emit_terminal_tool_call(tc) } stop_reason = final_response_stop_reason(final_response, tool_calls) usage = final_response_usage(final_response) guard { @emitter.(stop_reason: stop_reason, output_tokens: usage[:output_tokens]) } guard { @emitter.on_done(stop_reason: stop_reason, usage: usage, model: resolved_model(final_response)) } @phase = :finished rescue IOError, Errno::EPIPE => e mark_closed!(e) end |
#keep_alive! ⇒ Object
Emitters call this to send a keep-alive ping while a buffered tool call is being assembled (so large tool args don’t make the provider look dead, per G6c). The assembler invokes it from push() when it detects mid-tool-call gaps; keeping it public lets routes call it directly during long executor pre-steps too.
240 241 242 243 244 245 246 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 240 def keep_alive! return if @closed guard { @emitter.on_keep_alive } rescue IOError, Errno::EPIPE => e mark_closed!(e) end |
#on_failover_exhausted(error, status: 502, type: 'overloaded_error') ⇒ Object
Handle a provider-error after the failover ladder is exhausted (G6). Emits the client-format error event so the client sees a clean failure rather than a truncated stream.
181 182 183 184 185 186 187 188 189 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 181 def on_failover_exhausted(error, status: 502, type: 'overloaded_error') return if @closed start! unless @started guard { @emitter.on_error(message: error., type: type, status_code: status) } @phase = :finished rescue IOError, Errno::EPIPE => e mark_closed!(e) end |
#provider_failed(error:, resolution:) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 193 def provider_failed(error:, resolution:) return if @closed @failover_events ||= [] @failover_events << { provider: resolution.provider, instance: resolution.instance, model: resolution.model, phase: @phase, error: error.class.name } case @phase when :before_first_byte keep_alive! if @started when :mid_text close_text_block when :mid_thinking close_thinking_block when :mid_tool_call abort_open_tool_calls(reason: error.class.name) end end |
#provider_switched(from:, to:) ⇒ Object
rubocop:disable Lint/UnusedMethodArgument
217 218 219 220 221 222 223 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 217 def provider_switched(from:, to:) # rubocop:disable Lint/UnusedMethodArgument return if @closed @model = to.model.to_s keep_alive! if @started && @phase != :finished @phase = :before_first_byte if @phase == :mid_tool_call end |
#push(chunk) ⇒ Object
Consume one chunk. Raises StreamClosed if the client disconnected. Returns nil; emission happens through the emitter callbacks.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 116 def push(chunk) raise StreamClosed if @closed start! unless @started adapted = ChunkAdapter.normalize(chunk) return if adapted.nil? # Tool call deltas may arrive interleaved with text/thinking — the # state machine flushes text/thinking blocks first then opens the # tool block. if adapted.tool_calls.any? close_text_block close_thinking_block adapted.tool_calls.each { |tc| handle_tool_call_delta(tc) } end if adapted.thinking_text && !adapted.thinking_text.empty? handle_thinking_delta(adapted.thinking_text, adapted.thinking_signature) elsif adapted.thinking_signature && !adapted.thinking_signature.to_s.empty? @full_thinking_signature ||= adapted.thinking_signature.to_s end handle_text_delta(adapted.text) if adapted.text && !adapted.text.empty? rescue StreamClosed raise rescue IOError, Errno::EPIPE => e mark_closed!(e) raise StreamClosed, e. end |
#safe_replay_snapshot ⇒ Object
225 226 227 228 229 230 231 232 233 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 225 def safe_replay_snapshot { emitted_text: @full_text.dup, thinking_emitted: @thinking_block_open || @thinking_block_closed, thinking_signature_present: !@full_thinking_signature.to_s.empty?, open_tool_call_count: @open_tool_calls.size, phase: @phase } end |
#start! ⇒ Object
Begin the SSE response. Idempotent; safe to call from finalize too.
107 108 109 110 111 112 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 107 def start! return if @started || @closed guard { @emitter.on_start(model: @model, request_id: @request_id, input_tokens: @input_tokens) } @started = true end |