Class: Legion::LLM::API::StreamAssembler

Inherits:
Object
  • Object
show all
Extended by:
Legion::Logging::Helper
Includes:
ClientTranslators::SharedExtractors, Legion::Logging::Helper
Defined in:
lib/legion/llm/api/stream_assembler.rb

Overview

Single SSE state machine for client-format streaming.

Per design 2026-06-09 R5/G6:

- block index bookkeeping (one place, not per-route)
- thinking-before-text ordering
- fallback text emission (when provider returned no chunks)
- server tool result emission (LegionIO tools execute server-side and
  emit their result inline)
- push raises StreamClosed after client disconnect — caller treats as
  cancellation per G10
- finalize no-ops when closed
- provider-error mid-stream emits client-format error event after
  failover ladder is exhausted (G6)
- failover phase points: before-first-byte / mid-text / mid-thinking /
  mid-tool-call (the assembler tracks the current phase so the
  escalation chain knows what to replay)

Chunk shape: the assembler accepts canonical chunks (Legion::Extensions::Llm::Canonical::Chunk, post P3 translators) and the legacy provider StreamChunk (lex-llm Responses::StreamChunk). The ChunkAdapter normalizes both to a tiny internal struct so the state machine has one shape to reason about.

Defined Under Namespace

Modules: ChunkAdapter Classes: StreamClosed

Constant Summary collapse

PHASES =

Phases per G6 — failover ladder branches on this.

%i[
  before_first_byte
  mid_text
  mid_thinking
  mid_tool_call
  finished
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ClientTranslators::SharedExtractors

#args_as_json_string, #args_as_object, #extract_content_text, #extract_thinking_text, #legion_routing_explicit_from_env, #legion_routing_from_env, #text_content_type?

Constructor Details

#initialize(emitter:, request_id:, model:, input_tokens: 0, emit_thinking_blocks: nil, tool_call_buffering: nil, keep_alive_interval_ms: nil) ⇒ StreamAssembler

Returns a new instance of StreamAssembler.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/legion/llm/api/stream_assembler.rb', line 69

def initialize(emitter:, request_id:, model:, input_tokens: 0,
               emit_thinking_blocks: nil, tool_call_buffering: nil,
               keep_alive_interval_ms: nil)
  @emitter = emitter
  @request_id = request_id
  @model = model.to_s
  @input_tokens = input_tokens.to_i

  settings = Legion::Settings[:llm][:streaming] || {}
  @emit_thinking_blocks = if emit_thinking_blocks.nil?
                            settings[:emit_thinking_blocks] == true
                          else
                            emit_thinking_blocks
                          end
  @tool_call_buffering = (tool_call_buffering || settings[:tool_call_buffering] || :buffered).to_sym
  @keep_alive_interval_ms = (keep_alive_interval_ms || settings[:keep_alive_interval_ms] || 5_000).to_i

  @phase = :before_first_byte
  @closed = false
  @started = false
  @next_block_index = 0
  @text_block_index = nil
  @thinking_block_index = nil
  @text_block_open = false
  @thinking_block_open = false
  @thinking_block_closed = false
  @full_text = +''
  @full_thinking = +''
  @full_thinking_signature = nil
  @open_tool_calls = {} # tool_call_id => { block_index:, name:, server_tool:, last_args_str: }
  @last_keep_alive_at = nil
end

Instance Attribute Details

#failover_eventsObject (readonly)

Returns the value of attribute failover_events.



191
192
193
# File 'lib/legion/llm/api/stream_assembler.rb', line 191

def failover_events
  @failover_events
end

#phaseObject (readonly)

Emitter contract — every client format implements:

on_start(model:, request_id:, input_tokens:)
on_text_open(block_index:)                   — content_block_start text
on_text_delta(block_index:, text:)
on_text_close(block_index:)
on_thinking_open(block_index:)
on_thinking_delta(block_index:, text:, signature:)
on_thinking_close(block_index:, signature:)
on_tool_call_open(block_index:, tool_call:, server_tool:)
on_tool_call_delta(block_index:, partial_arguments_json:)
on_tool_call_close(block_index:)
on_server_tool_result(block_index:, tool_call_id:, result_text:)
on_keep_alive
on_message_delta(stop_reason:, output_tokens:)
on_done(stop_reason:, usage:, model:)
on_error(message:, type:, status_code:)

Emitters write to the response stream directly; they don’t return values. The assembler catches IOError/EPIPE and flips into closed state.



58
59
60
# File 'lib/legion/llm/api/stream_assembler.rb', line 58

def phase
  @phase
end

Instance Method Details

#closed?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/legion/llm/api/stream_assembler.rb', line 102

def closed?
  @closed
end

#finalize(final_response) ⇒ Object

Close the SSE stream. No-ops on a closed stream (per R5). final_response is the executor’s pipeline response (carries tool_calls, thinking, stop_reason, tokens for the trailer events).



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/legion/llm/api/stream_assembler.rb', line 150

def finalize(final_response)
  return if @closed

  start! unless @started

  handle_final_thinking(final_response) if @emit_thinking_blocks
  close_thinking_block

  tool_calls = extract_tool_calls(final_response)

  # Emit a fallback text block when nothing was streamed and there are
  # no tool calls — protects against providers that batched everything
  # into a non-streaming-shaped final response.
  emit_fallback_text(final_response) if !@text_block_open && @full_text.empty? && tool_calls.empty?

  close_text_block

  tool_calls.each { |tc| emit_terminal_tool_call(tc) }

  stop_reason = final_response_stop_reason(final_response, tool_calls)
  usage = final_response_usage(final_response)
  guard { @emitter.on_message_delta(stop_reason: stop_reason, output_tokens: usage[:output_tokens]) }
  guard { @emitter.on_done(stop_reason: stop_reason, usage: usage, model: resolved_model(final_response)) }
  @phase = :finished
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
end

#keep_alive!Object

Emitters call this to send a keep-alive ping while a buffered tool call is being assembled (so large tool args don’t make the provider look dead, per G6c). The assembler invokes it from push() when it detects mid-tool-call gaps; keeping it public lets routes call it directly during long executor pre-steps too.



240
241
242
243
244
245
246
# File 'lib/legion/llm/api/stream_assembler.rb', line 240

def keep_alive!
  return if @closed

  guard { @emitter.on_keep_alive }
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
end

#on_failover_exhausted(error, status: 502, type: 'overloaded_error') ⇒ Object

Handle a provider-error after the failover ladder is exhausted (G6). Emits the client-format error event so the client sees a clean failure rather than a truncated stream.



181
182
183
184
185
186
187
188
189
# File 'lib/legion/llm/api/stream_assembler.rb', line 181

def on_failover_exhausted(error, status: 502, type: 'overloaded_error')
  return if @closed

  start! unless @started
  guard { @emitter.on_error(message: error.message, type: type, status_code: status) }
  @phase = :finished
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
end

#provider_failed(error:, resolution:) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/legion/llm/api/stream_assembler.rb', line 193

def provider_failed(error:, resolution:)
  return if @closed

  @failover_events ||= []
  @failover_events << {
    provider: resolution.provider,
    instance: resolution.instance,
    model:    resolution.model,
    phase:    @phase,
    error:    error.class.name
  }

  case @phase
  when :before_first_byte
    keep_alive! if @started
  when :mid_text
    close_text_block
  when :mid_thinking
    close_thinking_block
  when :mid_tool_call
    abort_open_tool_calls(reason: error.class.name)
  end
end

#provider_switched(from:, to:) ⇒ Object

rubocop:disable Lint/UnusedMethodArgument



217
218
219
220
221
222
223
# File 'lib/legion/llm/api/stream_assembler.rb', line 217

def provider_switched(from:, to:) # rubocop:disable Lint/UnusedMethodArgument
  return if @closed

  @model = to.model.to_s
  keep_alive! if @started && @phase != :finished
  @phase = :before_first_byte if @phase == :mid_tool_call
end

#push(chunk) ⇒ Object

Consume one chunk. Raises StreamClosed if the client disconnected. Returns nil; emission happens through the emitter callbacks.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/legion/llm/api/stream_assembler.rb', line 116

def push(chunk)
  raise StreamClosed if @closed

  start! unless @started

  adapted = ChunkAdapter.normalize(chunk)
  return if adapted.nil?

  # Tool call deltas may arrive interleaved with text/thinking — the
  # state machine flushes text/thinking blocks first then opens the
  # tool block.
  if adapted.tool_calls.any?
    close_text_block
    close_thinking_block
    adapted.tool_calls.each { |tc| handle_tool_call_delta(tc) }
  end

  if adapted.thinking_text && !adapted.thinking_text.empty?
    handle_thinking_delta(adapted.thinking_text, adapted.thinking_signature)
  elsif adapted.thinking_signature && !adapted.thinking_signature.to_s.empty?
    @full_thinking_signature ||= adapted.thinking_signature.to_s
  end

  handle_text_delta(adapted.text) if adapted.text && !adapted.text.empty?
rescue StreamClosed
  raise
rescue IOError, Errno::EPIPE => e
  mark_closed!(e)
  raise StreamClosed, e.message
end

#safe_replay_snapshotObject



225
226
227
228
229
230
231
232
233
# File 'lib/legion/llm/api/stream_assembler.rb', line 225

def safe_replay_snapshot
  {
    emitted_text:               @full_text.dup,
    thinking_emitted:           @thinking_block_open || @thinking_block_closed,
    thinking_signature_present: !@full_thinking_signature.to_s.empty?,
    open_tool_call_count:       @open_tool_calls.size,
    phase:                      @phase
  }
end

#start!Object

Begin the SSE response. Idempotent; safe to call from finalize too.



107
108
109
110
111
112
# File 'lib/legion/llm/api/stream_assembler.rb', line 107

def start!
  return if @started || @closed

  guard { @emitter.on_start(model: @model, request_id: @request_id, input_tokens: @input_tokens) }
  @started = true
end