Class: Legion::LLM::API::StreamAssembler

Inherits:
Object
  • Object
show all
Extended by:
Legion::Logging::Helper
Includes:
ClientTranslators::SharedExtractors, Legion::Logging::Helper
Defined in:
lib/legion/llm/api/stream_assembler.rb

Overview

Single SSE state machine for client-format streaming.

Per design 2026-06-09 R5/G6:

- block index bookkeeping (one place, not per-route)
- thinking-before-text ordering
- fallback text emission (when provider returned no chunks)
- server tool result emission (LegionIO tools execute server-side and
  emit their result inline)
- push raises StreamClosed after client disconnect — caller treats as
  cancellation per G10
- finalize no-ops when closed
- provider-error mid-stream emits client-format error event after
  failover ladder is exhausted (G6)
- failover phase points: before-first-byte / mid-text / mid-thinking /
  mid-tool-call (the assembler tracks the current phase so the
  escalation chain knows what to replay)

Chunk shape: the assembler accepts canonical chunks (Legion::Extensions::Llm::Canonical::Chunk, post P3 translators) and the legacy provider StreamChunk (lex-llm Responses::StreamChunk). The ChunkAdapter normalizes both to a tiny internal struct so the state machine has one shape to reason about.

Defined Under Namespace

Modules: ChunkAdapter Classes: StreamClosed

Constant Summary collapse

PHASES =

Phases per G6 — failover ladder branches on this.

%i[
  before_first_byte
  mid_text
  mid_thinking
  mid_tool_call
  finished
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ClientTranslators::SharedExtractors

#apply_canonical_params_to_inference, #args_as_json_string, #args_as_object, #extract_content_text, #extract_thinking_text, #legion_routing_explicit_from_env, #legion_routing_from_env, #normalize_canonical_params, #normalize_response_format, #normalize_stop_sequences, #text_content_type?

Constructor Details

#initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil, initial_lane: nil) ⇒ StreamAssembler

H-I / sonnet G6 / D-K: initial_lane required so the assembler always has a lane on hand. Passing initial_lane: nil raises ArgumentError.

Raises:

  • (ArgumentError)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/legion/llm/api/stream_assembler.rb', line 71

def initialize(emitter:, request_id:, model:, input_tokens: 0,
               emit_thinking_blocks: nil, tool_call_buffering: nil,
               keep_alive_interval_ms: nil, initial_lane: nil, **)
  raise ArgumentError, 'initial_lane: must be a non-nil lane Hash' if initial_lane.nil?

  @emitter = emitter
  @request_id = request_id
  @model = model.to_s
  @input_tokens = input_tokens.to_i

  # G30 / D-F: failover tracking for debug trailers. @current_lane tracks the
  # active dispatch lane; @failover_chain records lane IDs for the x-legion-failover-*
  # debug trailers emitted at finalize time. NO custom SSE event (N×N invariant 5).
  @current_lane = initial_lane
  @failover_chain = [initial_lane.is_a?(Hash) ? initial_lane[:id] : initial_lane.to_s]

  settings = Legion::Settings[:llm][:streaming] || {}
  @emit_thinking_blocks = if emit_thinking_blocks.nil?
                            settings[:emit_thinking_blocks] == true
                          else
                            emit_thinking_blocks
                          end
  @tool_call_buffering = (tool_call_buffering || settings[:tool_call_buffering] || :buffered).to_sym
  @keep_alive_interval_ms = (keep_alive_interval_ms || settings[:keep_alive_interval_ms] || 5_000).to_i

  @phase = :before_first_byte
  @closed = false
  @started = false
  @next_block_index = 0
  @text_block_index = nil
  @thinking_block_index = nil
  @text_block_open = false
  @thinking_block_open = false
  @thinking_block_closed = false
  @full_text = +''
  @full_thinking = +''
  @full_thinking_signature = nil
  @open_tool_calls = {} # tool_call_id => { block_index:, name:, server_tool:, last_args_str: }
  @last_keep_alive_at = nil
end

Instance Attribute Details

#failover_eventsObject (readonly)

Returns the value of attribute failover_events.



202
203
204
# File 'lib/legion/llm/api/stream_assembler.rb', line 202

def failover_events
  @failover_events
end

#phaseObject (readonly)

Emitter contract — every client format implements:

on_start(model:, request_id:, input_tokens:)
on_text_open(block_index:)                   — content_block_start text
on_text_delta(block_index:, text:)
on_text_close(block_index:)
on_thinking_open(block_index:)
on_thinking_delta(block_index:, text:, signature:)
on_thinking_close(block_index:, signature:)
on_tool_call_open(block_index:, tool_call:, server_tool:)
on_tool_call_delta(block_index:, partial_arguments_json:)
on_tool_call_close(block_index:)
on_server_tool_result(block_index:, tool_call_id:, result_text:)
on_keep_alive
on_message_delta(stop_reason:, output_tokens:)
on_done(stop_reason:, usage:, model:)
on_error(message:, type:, status_code:)

Emitters write to the response stream directly; they don’t return values. The assembler catches IOError/EPIPE and flips into closed state.



58
59
60
# File 'lib/legion/llm/api/stream_assembler.rb', line 58

def phase
  @phase
end

Instance Method Details

#begin_dispatch_on(lane:) ⇒ Object

Called by the executor after selecting the next lane. Updates @current_lane and appends the new lane ID to the failover chain for debug trailers.



262
263
264
265
266
267
268
269
# File 'lib/legion/llm/api/stream_assembler.rb', line 262

def begin_dispatch_on(lane:, **)
  return if @closed

  @current_lane = lane
  @failover_chain << (lane.is_a?(Hash) ? lane[:id] : lane.to_s)
  log.info("[llm][stream_assembler] action=begin_dispatch lane=#{lane.is_a?(Hash) ? lane[:id] : lane} " \
           "request_id=#{@request_id}")
end

#closed?Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/legion/llm/api/stream_assembler.rb', line 112

def closed?
  @closed
end

#finalize(final_response) ⇒ Object

Close the SSE stream. No-ops on a closed stream (per R5). final_response is the executor’s pipeline response (carries tool_calls, thinking, stop_reason, tokens for the trailer events).



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/legion/llm/api/stream_assembler.rb', line 160

def finalize(final_response)
  return if @closed

  start! unless @started

  handle_final_thinking(final_response) if @emit_thinking_blocks
  close_thinking_block

  tool_calls = extract_tool_calls(final_response)

  # Emit a fallback text block when nothing was streamed and there are
  # no tool calls — protects against providers that batched everything
  # into a non-streaming-shaped final response.
  emit_fallback_text(final_response) if !@text_block_open && @full_text.empty? && tool_calls.empty?

  close_text_block

  tool_calls.each { |tc| emit_terminal_tool_call(tc) }

  stop_reason = final_response_stop_reason(final_response, tool_calls)
  usage = final_response_usage(final_response)
  guard { @emitter.on_message_delta(stop_reason: stop_reason, output_tokens: usage[:output_tokens]) }
  guard { @emitter.on_done(stop_reason: stop_reason, usage: usage, model: resolved_model(final_response)) }
  emit_failover_trailers!
  @phase = :finished
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
end

#keep_alive!Object

Emitters call this to send a keep-alive ping while a buffered tool call is being assembled (so large tool args don’t make the provider look dead, per G6c). The assembler invokes it from push() when it detects mid-tool-call gaps; keeping it public lets routes call it directly during long executor pre-steps too.



286
287
288
289
290
291
292
# File 'lib/legion/llm/api/stream_assembler.rb', line 286

def keep_alive!
  return if @closed

  guard { @emitter.on_keep_alive }
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
end

#on_failover_exhausted(error, status: 502, type: 'overloaded_error') ⇒ Object

Handle a provider-error after the failover ladder is exhausted (G6). Emits the client-format error event so the client sees a clean failure rather than a truncated stream.



192
193
194
195
196
197
198
199
200
# File 'lib/legion/llm/api/stream_assembler.rb', line 192

def on_failover_exhausted(error, status: 502, type: 'overloaded_error')
  return if @closed

  start! unless @started
  guard { @emitter.on_error(message: error.message, type: type, status_code: status) }
  @phase = :finished
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
end

#provider_failed(error:, resolution:) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/legion/llm/api/stream_assembler.rb', line 204

def provider_failed(error:, resolution:)
  return if @closed

  @failover_events ||= []
  @failover_events << {
    provider: resolution.provider,
    instance: resolution.instance,
    model:    resolution.model,
    phase:    @phase,
    error:    error.class.name
  }

  case @phase
  when :before_first_byte
    keep_alive! if @started
  when :mid_text
    close_text_block
  when :mid_thinking
    close_thinking_block
  when :mid_tool_call
    abort_open_tool_calls(reason: error.class.name)
  end
end

#provider_failover_pending!(from:) ⇒ Object

G30 / D-F / N×N invariant 5: mid-stream failover is silent at the SSE wire. Clears the partial canonical buffer so the next provider renders a clean start. Appends a failover marker to @failover_chain for debug trailers at finalize. NO custom SSE event is emitted — that would violate N×N “always translate”.



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/legion/llm/api/stream_assembler.rb', line 240

def provider_failover_pending!(from:, **)
  return if @closed

  from_id = from.is_a?(Hash) ? from[:id] : from.to_s
  log.warn('[llm][stream_assembler] action=provider_failover_pending ' \
           "from=#{from_id} request_id=#{@request_id}")
  # Clear the partial canonical buffer — partial responses can't cross providers
  # (thinking strip, context invalidation). The next provider starts fresh.
  close_thinking_block if @thinking_block_open
  close_text_block     if @text_block_open
  abort_open_tool_calls(reason: 'provider_failover')
  @full_text.clear
  @full_thinking.clear
  @full_thinking_signature = nil
  @phase = :before_first_byte
  @failover_chain << :failover_marker
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
end

#provider_switched(from:, to:) ⇒ Object

rubocop:disable Lint/UnusedMethodArgument



228
229
230
231
232
233
234
# File 'lib/legion/llm/api/stream_assembler.rb', line 228

def provider_switched(from:, to:) # rubocop:disable Lint/UnusedMethodArgument
  return if @closed

  @model = to.model.to_s
  keep_alive! if @started && @phase != :finished
  @phase = :before_first_byte if @phase == :mid_tool_call
end

#push(chunk) ⇒ Object

Consume one chunk. Raises StreamClosed if the client disconnected. Returns nil; emission happens through the emitter callbacks.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/legion/llm/api/stream_assembler.rb', line 126

def push(chunk)
  raise StreamClosed if @closed

  start! unless @started

  adapted = ChunkAdapter.normalize(chunk)
  return if adapted.nil?

  # Tool call deltas may arrive interleaved with text/thinking — the
  # state machine flushes text/thinking blocks first then opens the
  # tool block.
  if adapted.tool_calls.any?
    close_text_block
    close_thinking_block
    adapted.tool_calls.each { |tc| handle_tool_call_delta(tc) }
  end

  if adapted.thinking_text && !adapted.thinking_text.empty?
    handle_thinking_delta(adapted.thinking_text, adapted.thinking_signature)
  elsif adapted.thinking_signature && !adapted.thinking_signature.to_s.empty?
    @full_thinking_signature ||= adapted.thinking_signature.to_s
  end

  handle_text_delta(adapted.text) if adapted.text && !adapted.text.empty?
rescue StreamClosed
  raise
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
  raise StreamClosed, e.message
end

#safe_replay_snapshotObject



271
272
273
274
275
276
277
278
279
# File 'lib/legion/llm/api/stream_assembler.rb', line 271

def safe_replay_snapshot
  {
    emitted_text:               @full_text.dup,
    thinking_emitted:           @thinking_block_open || @thinking_block_closed,
    thinking_signature_present: !@full_thinking_signature.to_s.empty?,
    open_tool_call_count:       @open_tool_calls.size,
    phase:                      @phase
  }
end

#start!Object

Begin the SSE response. Idempotent; safe to call from finalize too.



117
118
119
120
121
122
# File 'lib/legion/llm/api/stream_assembler.rb', line 117

def start!
  return if @started || @closed

  guard { @emitter.on_start(model: @model, request_id: @request_id, input_tokens: @input_tokens) }
  @started = true
end