Class: Legion::LLM::API::StreamAssembler
- Inherits:
-
Object
- Object
- Legion::LLM::API::StreamAssembler
- Extended by:
- Legion::Logging::Helper
- Includes:
- ClientTranslators::SharedExtractors, Legion::Logging::Helper
- Defined in:
- lib/legion/llm/api/stream_assembler.rb
Overview
Single SSE state machine for client-format streaming.
Per design 2026-06-09 R5/G6:
- block index bookkeeping (one place, not per-route)
- thinking-before-text ordering
- fallback text emission (when provider returned no chunks)
- server tool result emission (LegionIO tools execute server-side and
emit their result inline)
- push raises StreamClosed after client disconnect — caller treats as
cancellation per G10
- finalize no-ops when closed
- provider-error mid-stream emits client-format error event after
failover ladder is exhausted (G6)
- failover phase points: before-first-byte / mid-text / mid-thinking /
mid-tool-call (the assembler tracks the current phase so the
escalation chain knows what to replay)
Chunk shape: the assembler accepts canonical chunks (Legion::Extensions::Llm::Canonical::Chunk, post P3 translators) and the legacy provider StreamChunk (lex-llm Responses::StreamChunk). The ChunkAdapter normalizes both to a tiny internal struct so the state machine has one shape to reason about.
Defined Under Namespace
Modules: ChunkAdapter Classes: StreamClosed
Constant Summary collapse
- PHASES =
Phases per G6 — failover ladder branches on this.
%i[ before_first_byte mid_text mid_thinking mid_tool_call finished ].freeze
Instance Attribute Summary collapse
-
#failover_events ⇒ Object
readonly
Returns the value of attribute failover_events.
-
#phase ⇒ Object
readonly
Emitter contract — every client format implements: on_start(model:, request_id:, input_tokens:) on_text_open(block_index:) — content_block_start text on_text_delta(block_index:, text:) on_text_close(block_index:) on_thinking_open(block_index:) on_thinking_delta(block_index:, text:, signature:) on_thinking_close(block_index:, signature:) on_tool_call_open(block_index:, tool_call:, server_tool:) on_tool_call_delta(block_index:, partial_arguments_json:) on_tool_call_close(block_index:) on_server_tool_result(block_index:, tool_call_id:, result_text:) on_keep_alive on_message_delta(stop_reason:, output_tokens:) on_done(stop_reason:, usage:, model:) on_error(message:, type:, status_code:).
Instance Method Summary collapse
-
#begin_dispatch_on(lane:) ⇒ Object
Called by the executor after selecting the next lane.
- #closed? ⇒ Boolean
-
#finalize(final_response) ⇒ Object
Close the SSE stream.
-
#initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil, initial_lane: nil) ⇒ StreamAssembler
constructor
H-I / sonnet G6 / D-K: initial_lane required so the assembler always has a lane on hand.
-
#keep_alive! ⇒ Object
Emitters call this to send a keep-alive ping while a buffered tool call is being assembled (so large tool args don’t make the provider look dead, per G6c).
-
#on_failover_exhausted(error, status: 502, type: 'overloaded_error') ⇒ Object
Handle a provider-error after the failover ladder is exhausted (G6).
- #provider_failed(error:, resolution:) ⇒ Object
-
#provider_failover_pending!(from:) ⇒ Object
G30 / D-F / N×N invariant 5: mid-stream failover is silent at the SSE wire.
-
#provider_switched(from:, to:) ⇒ Object
rubocop:disable Lint/UnusedMethodArgument.
-
#push(chunk) ⇒ Object
Consume one chunk.
- #safe_replay_snapshot ⇒ Object
-
#start! ⇒ Object
Begin the SSE response.
Methods included from ClientTranslators::SharedExtractors
#apply_canonical_params_to_inference, #args_as_json_string, #args_as_object, #extract_content_text, #extract_thinking_text, #legion_routing_explicit_from_env, #legion_routing_from_env, #normalize_canonical_params, #normalize_response_format, #normalize_stop_sequences, #text_content_type?
Constructor Details
#initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil, initial_lane: nil) ⇒ StreamAssembler
H-I / sonnet G6 / D-K: initial_lane required so the assembler always has a lane on hand. Passing initial_lane: nil raises ArgumentError.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 71 def initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil, initial_lane: nil, **) raise ArgumentError, 'initial_lane: must be a non-nil lane Hash' if initial_lane.nil? @emitter = emitter @request_id = request_id @model = model.to_s @input_tokens = input_tokens.to_i # G30 / D-F: failover tracking for debug trailers. @current_lane tracks the # active dispatch lane; @failover_chain records lane IDs for the x-legion-failover-* # debug trailers emitted at finalize time. NO custom SSE event (N×N invariant 5). @current_lane = initial_lane @failover_chain = [initial_lane.is_a?(Hash) ? initial_lane[:id] : initial_lane.to_s] settings = Legion::Settings[:llm][:streaming] || {} @emit_thinking_blocks = if emit_thinking_blocks.nil? settings[:emit_thinking_blocks] == true else emit_thinking_blocks end @tool_call_buffering = (tool_call_buffering || settings[:tool_call_buffering] || :buffered).to_sym @keep_alive_interval_ms = (keep_alive_interval_ms || settings[:keep_alive_interval_ms] || 5_000).to_i @phase = :before_first_byte @closed = false @started = false @next_block_index = 0 @text_block_index = nil @thinking_block_index = nil @text_block_open = false @thinking_block_open = false @thinking_block_closed = false @full_text = +'' @full_thinking = +'' @full_thinking_signature = nil @open_tool_calls = {} # tool_call_id => { block_index:, name:, server_tool:, last_args_str: } @last_keep_alive_at = nil end |
Instance Attribute Details
#failover_events ⇒ Object (readonly)
Returns the value of attribute failover_events.
202 203 204 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 202 def failover_events @failover_events end |
#phase ⇒ Object (readonly)
Emitter contract — every client format implements:
on_start(model:, request_id:, input_tokens:)
on_text_open(block_index:) — content_block_start text
on_text_delta(block_index:, text:)
on_text_close(block_index:)
on_thinking_open(block_index:)
on_thinking_delta(block_index:, text:, signature:)
on_thinking_close(block_index:, signature:)
on_tool_call_open(block_index:, tool_call:, server_tool:)
on_tool_call_delta(block_index:, partial_arguments_json:)
on_tool_call_close(block_index:)
on_server_tool_result(block_index:, tool_call_id:, result_text:)
on_keep_alive
on_message_delta(stop_reason:, output_tokens:)
on_done(stop_reason:, usage:, model:)
on_error(message:, type:, status_code:)
Emitters write to the response stream directly; they don’t return values. The assembler catches IOError/EPIPE and flips into closed state.
58 59 60 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 58 def phase @phase end |
Instance Method Details
#begin_dispatch_on(lane:) ⇒ Object
Called by the executor after selecting the next lane. Updates @current_lane and appends the new lane ID to the failover chain for debug trailers.
262 263 264 265 266 267 268 269 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 262 def begin_dispatch_on(lane:, **) return if @closed @current_lane = lane @failover_chain << (lane.is_a?(Hash) ? lane[:id] : lane.to_s) log.info("[llm][stream_assembler] action=begin_dispatch lane=#{lane.is_a?(Hash) ? lane[:id] : lane} " \ "request_id=#{@request_id}") end |
#closed? ⇒ Boolean
112 113 114 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 112 def closed? @closed end |
#finalize(final_response) ⇒ Object
Close the SSE stream. No-ops on a closed stream (per R5). final_response is the executor’s pipeline response (carries tool_calls, thinking, stop_reason, tokens for the trailer events).
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 160 def finalize(final_response) return if @closed start! unless @started handle_final_thinking(final_response) if @emit_thinking_blocks close_thinking_block tool_calls = extract_tool_calls(final_response) # Emit a fallback text block when nothing was streamed and there are # no tool calls — protects against providers that batched everything # into a non-streaming-shaped final response. emit_fallback_text(final_response) if !@text_block_open && @full_text.empty? && tool_calls.empty? close_text_block tool_calls.each { |tc| emit_terminal_tool_call(tc) } stop_reason = final_response_stop_reason(final_response, tool_calls) usage = final_response_usage(final_response) guard { @emitter.(stop_reason: stop_reason, output_tokens: usage[:output_tokens]) } guard { @emitter.on_done(stop_reason: stop_reason, usage: usage, model: resolved_model(final_response)) } emit_failover_trailers! @phase = :finished rescue IOError, Errno::EPIPE => e mark_closed!(e) end |
#keep_alive! ⇒ Object
Emitters call this to send a keep-alive ping while a buffered tool call is being assembled (so large tool args don’t make the provider look dead, per G6c). The assembler invokes it from push() when it detects mid-tool-call gaps; keeping it public lets routes call it directly during long executor pre-steps too.
286 287 288 289 290 291 292 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 286 def keep_alive! return if @closed guard { @emitter.on_keep_alive } rescue IOError, Errno::EPIPE => e mark_closed!(e) end |
#on_failover_exhausted(error, status: 502, type: 'overloaded_error') ⇒ Object
Handle a provider-error after the failover ladder is exhausted (G6). Emits the client-format error event so the client sees a clean failure rather than a truncated stream.
192 193 194 195 196 197 198 199 200 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 192 def on_failover_exhausted(error, status: 502, type: 'overloaded_error') return if @closed start! unless @started guard { @emitter.on_error(message: error., type: type, status_code: status) } @phase = :finished rescue IOError, Errno::EPIPE => e mark_closed!(e) end |
#provider_failed(error:, resolution:) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 204 def provider_failed(error:, resolution:) return if @closed @failover_events ||= [] @failover_events << { provider: resolution.provider, instance: resolution.instance, model: resolution.model, phase: @phase, error: error.class.name } case @phase when :before_first_byte keep_alive! if @started when :mid_text close_text_block when :mid_thinking close_thinking_block when :mid_tool_call abort_open_tool_calls(reason: error.class.name) end end |
#provider_failover_pending!(from:) ⇒ Object
G30 / D-F / N×N invariant 5: mid-stream failover is silent at the SSE wire. Clears the partial canonical buffer so the next provider renders a clean start. Appends a failover marker to @failover_chain for debug trailers at finalize. NO custom SSE event is emitted — that would violate N×N “always translate”.
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 240 def provider_failover_pending!(from:, **) return if @closed from_id = from.is_a?(Hash) ? from[:id] : from.to_s log.warn('[llm][stream_assembler] action=provider_failover_pending ' \ "from=#{from_id} request_id=#{@request_id}") # Clear the partial canonical buffer — partial responses can't cross providers # (thinking strip, context invalidation). The next provider starts fresh. close_thinking_block if @thinking_block_open close_text_block if @text_block_open abort_open_tool_calls(reason: 'provider_failover') @full_text.clear @full_thinking.clear @full_thinking_signature = nil @phase = :before_first_byte @failover_chain << :failover_marker rescue IOError, Errno::EPIPE => e mark_closed!(e) end |
#provider_switched(from:, to:) ⇒ Object
rubocop:disable Lint/UnusedMethodArgument
228 229 230 231 232 233 234 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 228 def provider_switched(from:, to:) # rubocop:disable Lint/UnusedMethodArgument return if @closed @model = to.model.to_s keep_alive! if @started && @phase != :finished @phase = :before_first_byte if @phase == :mid_tool_call end |
#push(chunk) ⇒ Object
Consume one chunk. Raises StreamClosed if the client disconnected. Returns nil; emission happens through the emitter callbacks.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 126 def push(chunk) raise StreamClosed if @closed start! unless @started adapted = ChunkAdapter.normalize(chunk) return if adapted.nil? # Tool call deltas may arrive interleaved with text/thinking — the # state machine flushes text/thinking blocks first then opens the # tool block. if adapted.tool_calls.any? close_text_block close_thinking_block adapted.tool_calls.each { |tc| handle_tool_call_delta(tc) } end if adapted.thinking_text && !adapted.thinking_text.empty? handle_thinking_delta(adapted.thinking_text, adapted.thinking_signature) elsif adapted.thinking_signature && !adapted.thinking_signature.to_s.empty? @full_thinking_signature ||= adapted.thinking_signature.to_s end handle_text_delta(adapted.text) if adapted.text && !adapted.text.empty? rescue StreamClosed raise rescue IOError, Errno::EPIPE => e mark_closed!(e) raise StreamClosed, e. end |
#safe_replay_snapshot ⇒ Object
271 272 273 274 275 276 277 278 279 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 271 def safe_replay_snapshot { emitted_text: @full_text.dup, thinking_emitted: @thinking_block_open || @thinking_block_closed, thinking_signature_present: !@full_thinking_signature.to_s.empty?, open_tool_call_count: @open_tool_calls.size, phase: @phase } end |
#start! ⇒ Object
Begin the SSE response. Idempotent; safe to call from finalize too.
117 118 119 120 121 122 |
# File 'lib/legion/llm/api/stream_assembler.rb', line 117 def start! return if @started || @closed guard { @emitter.on_start(model: @model, request_id: @request_id, input_tokens: @input_tokens) } @started = true end |