Class: LLM::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/llm/stream.rb,
lib/llm/stream/queue.rb

Overview

Note:

The ‘on_*` callbacks run inline with the streaming parser. They therefore block streaming progress and should generally return as quickly as possible.

The LLM::Stream class provides the callback interface for streamed model output in llm.rb.

A stream object can be an instance of LLM::Stream or a subclass that overrides the callbacks it needs. For basic streaming, llm.rb also accepts any object that implements ‘#<<`. #queue provides a small helper for collecting asynchronous tool work started from a callback, and #tool_not_found returns an in-band tool error when a streamed tool cannot be resolved.

The most common callback is #on_content, which also maps to #<<. Providers may also call #on_reasoning_content and #on_tool_call when that data is available. Runtime features such as context compaction may also emit lifecycle callbacks like #on_transform or #on_compaction.

Defined Under Namespace

Classes: Queue

Public callbacks collapse

Error handlers collapse

Instance Method Summary collapse

Instance Method Details

#ctxLLM::Context?

Returns the current context, if one was attached to the stream.

Returns:



36
37
38
# File 'lib/llm/stream.rb', line 36

def ctx
  extra[:ctx]
end

#extraHash

Returns extra context associated with the current streamed request.

Returns:

  • (Hash)


29
30
31
# File 'lib/llm/stream.rb', line 29

def extra
  @extra ||= LLM::Object.from({})
end

#find_tool(name) ⇒ LLM::Function?

Resolves a streamed tool call against the current request tools first, then falls back to the global function registry.

Parameters:

  • name (String)

Returns:



183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/llm/stream.rb', line 183

def find_tool(name)
  tool = tools.find do |candidate|
    candidate_name =
      if candidate.respond_to?(:function)
        candidate.function.name
      else
        candidate.name
      end
    candidate_name.to_s == name.to_s
  end
  tool&.then { _1.respond_to?(:function) ? _1.function : _1 } ||
    LLM::Function.find_by_name(name)
end

#on_compaction(ctx, compactor) ⇒ nil

Called before a context compaction starts.

Parameters:

Returns:

  • (nil)


138
139
140
# File 'lib/llm/stream.rb', line 138

def on_compaction(ctx, compactor)
  nil
end

#on_compaction_finish(ctx, compactor) ⇒ nil

Called after a context compaction finishes.

Parameters:

Returns:

  • (nil)


147
148
149
# File 'lib/llm/stream.rb', line 147

def on_compaction_finish(ctx, compactor)
  nil
end

#on_content(content) ⇒ nil Also known as: <<

Called when visible assistant output is streamed.

Parameters:

  • content (String)

    A chunk of assistant-visible text.

Returns:

  • (nil)


63
64
65
# File 'lib/llm/stream.rb', line 63

def on_content(content)
  nil
end

#on_reasoning_content(content) ⇒ nil

Called when reasoning output is streamed separately from visible content.

Parameters:

  • content (String)

    A chunk of reasoning text.

Returns:

  • (nil)


73
74
75
# File 'lib/llm/stream.rb', line 73

def on_reasoning_content(content)
  nil
end

#on_tool_call(tool, error) ⇒ nil

Note:

A stream implementation may start tool execution here, for example by pushing ‘ctx.spawn(tool, :thread)`, `ctx.spawn(tool, :fiber)`, or `ctx.spawn(tool, :task)` onto #queue. Mixed strategies can also be selected per tool, such as `tool.mcp? ? ctx.spawn(tool, :task) : ctx.spawn(tool, :ractor)`. When a streamed tool cannot be resolved, `error` is passed as an Function::Return. It can be sent back to the model, allowing the tool-call path to recover and the session to continue. Streamed tool resolution now prefers the current request tools, so LLM.function, MCP tools, bound tool instances, and normal LLM::Tool classes can all resolve through the same request-local path. The current `:ractor` mode is for class-based tools and does not support MCP tools.

Called when a streamed tool call has been fully constructed.

Parameters:

Returns:

  • (nil)


97
98
99
# File 'lib/llm/stream.rb', line 97

def on_tool_call(tool, error)
  nil
end

#on_tool_return(tool, result) ⇒ nil

Note:

This callback runs when #wait resolves work that was queued from #on_tool_call, such as values returned by ‘ctx.spawn(tool, :thread)`, `ctx.spawn(tool, :fiber)`, or `ctx.spawn(tool, :task)`.

Called when queued streamed tool work returns.

Parameters:

Returns:

  • (nil)


111
112
113
# File 'lib/llm/stream.rb', line 111

def on_tool_return(tool, result)
  nil
end

#on_transform(ctx, transformer) ⇒ nil

Called before a context transformer rewrites a prompt.

Parameters:

Returns:

  • (nil)


120
121
122
# File 'lib/llm/stream.rb', line 120

def on_transform(ctx, transformer)
  nil
end

#on_transform_finish(ctx, transformer) ⇒ nil

Called after a context transformer finishes rewriting a prompt.

Parameters:

Returns:

  • (nil)


129
130
131
# File 'lib/llm/stream.rb', line 129

def on_transform_finish(ctx, transformer)
  nil
end

#queueLLM::Stream::Queue

Returns a lazily-initialized queue for tool results or spawned work.

Returns:



43
44
45
# File 'lib/llm/stream.rb', line 43

def queue
  @queue ||= Queue.new(self)
end

#tool_not_found(tool) ⇒ LLM::Function::Return

Note:

This is mainly useful as a fallback from #on_tool_call. It should be uncommon in normal use, since streamed tool callbacks only run for tools already defined in the context.

Returns a function return describing a streamed tool that could not be resolved.

Parameters:

Returns:



163
164
165
166
167
# File 'lib/llm/stream.rb', line 163

def tool_not_found(tool)
  LLM::Function::Return.new(tool.id, tool.name, {
    error: true, type: LLM::NoSuchToolError.name, message: "tool not found"
  })
end

#toolsArray<LLM::Function, LLM::Tool>

Returns the tool definitions available for the current streamed request. This prefers request-local tools attached to the stream and falls back to the current context defaults when present.

Returns:



174
175
176
# File 'lib/llm/stream.rb', line 174

def tools
  extra[:tools] || ctx&.params&.dig(:tools) || []
end

#waitArray<LLM::Function::Return>

Waits for queued tool work to finish and returns function results. Any passed arguments are ignored because queued work is waited according to the actual task types already present in the queue.

Returns:



52
53
54
# File 'lib/llm/stream.rb', line 52

def wait(*)
  queue.wait
end