Class: LLM::Stream
- Inherits:
-
Object
- Object
- LLM::Stream
- Defined in:
- lib/llm/stream.rb,
lib/llm/stream/queue.rb
Overview
The ‘on_*` callbacks run inline with the streaming parser. They therefore block streaming progress and should generally return as quickly as possible.
The LLM::Stream class provides the callback interface for streamed model output in llm.rb.
A stream object can be an instance of LLM::Stream or a subclass that overrides the callbacks it needs. For basic streaming, llm.rb also accepts any object that implements ‘#<<`. #queue provides a small helper for collecting asynchronous tool work started from a callback.
The most common callback is #on_content, which also maps to #<<. Providers may also call #on_reasoning_content and #on_tool_call when that data is available. Runtime features such as context compaction may also emit lifecycle callbacks like #on_transform or #on_compaction.
Defined Under Namespace
Classes: Queue
Public callbacks collapse
-
#on_compaction(ctx, compactor) ⇒ nil
Called before a context compaction starts.
-
#on_compaction_finish(ctx, compactor) ⇒ nil
Called after a context compaction finishes.
-
#on_content(content) ⇒ nil
(also: #<<)
Called when visible assistant output is streamed.
-
#on_reasoning_content(content) ⇒ nil
Called when reasoning output is streamed separately from visible content.
-
#on_tool_call(tool, error) ⇒ nil
Called when a streamed tool call has been fully constructed.
-
#on_tool_return(tool, result) ⇒ nil
Called when queued streamed tool work returns.
-
#on_transform(ctx, transformer) ⇒ nil
Called before a context transformer rewrites a prompt.
-
#on_transform_finish(ctx, transformer) ⇒ nil
Called after a context transformer finishes rewriting a prompt.
Finders collapse
-
#__find__(name) ⇒ LLM::Function?
Resolves a streamed tool call against the current request tools first, then falls back to the global function registry.
Instance Method Summary collapse
-
#ctx ⇒ LLM::Context?
Returns the current context, if one was attached to the stream.
-
#extra ⇒ Hash
Returns extra context associated with the current streamed request.
-
#queue ⇒ LLM::Stream::Queue
Returns a lazily-initialized queue for tool results or spawned work.
-
#wait ⇒ Array<LLM::Function::Return>
Waits for queued tool work to finish and returns function results.
Instance Method Details
#__find__(name) ⇒ LLM::Function?
Resolves a streamed tool call against the current request tools first, then falls back to the global function registry.
159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/llm/stream.rb', line 159 def __find__(name) tools = extra[:tools] || ctx&.params&.dig(:tools) || [] tool = tools.find do candidate = _1.respond_to?(:function) ? _1.function.name : _1.name candidate.to_s == name.to_s end if tool tool.respond_to?(:function) ? tool.function : tool else LLM::Function.find_by_name(name) end end |
#ctx ⇒ LLM::Context?
Returns the current context, if one was attached to the stream.
35 36 37 |
# File 'lib/llm/stream.rb', line 35 def ctx extra[:ctx] end |
#extra ⇒ Hash
Returns extra context associated with the current streamed request.
28 29 30 |
# File 'lib/llm/stream.rb', line 28 def extra @extra ||= LLM::Object.from({}) end |
#on_compaction(ctx, compactor) ⇒ nil
Called before a context compaction starts.
137 138 139 |
# File 'lib/llm/stream.rb', line 137 def on_compaction(ctx, compactor) nil end |
#on_compaction_finish(ctx, compactor) ⇒ nil
Called after a context compaction finishes.
146 147 148 |
# File 'lib/llm/stream.rb', line 146 def on_compaction_finish(ctx, compactor) nil end |
#on_content(content) ⇒ nil Also known as: <<
Called when visible assistant output is streamed.
62 63 64 |
# File 'lib/llm/stream.rb', line 62 def on_content(content) nil end |
#on_reasoning_content(content) ⇒ nil
Called when reasoning output is streamed separately from visible content.
72 73 74 |
# File 'lib/llm/stream.rb', line 72 def on_reasoning_content(content) nil end |
#on_tool_call(tool, error) ⇒ nil
A stream implementation may start tool execution here, for example by pushing ‘ctx.spawn(tool, :thread)`, `ctx.spawn(tool, :fiber)`, or `ctx.spawn(tool, :task)` onto #queue. Mixed strategies can also be selected per tool, such as `tool.mcp? ? ctx.spawn(tool, :task) : ctx.spawn(tool, :ractor)`. When a streamed tool cannot be resolved, `error` is passed as an Function::Return. It can be sent back to the model, allowing the tool-call path to recover and the session to continue. Streamed tool resolution now prefers the current request tools, so LLM.function, MCP tools, bound tool instances, and normal LLM::Tool classes can all resolve through the same request-local path. The current `:ractor` mode is for class-based tools and does not support MCP tools.
Called when a streamed tool call has been fully constructed.
96 97 98 |
# File 'lib/llm/stream.rb', line 96 def on_tool_call(tool, error) nil end |
#on_tool_return(tool, result) ⇒ nil
This callback runs when #wait resolves work that was queued from #on_tool_call, such as values returned by ‘ctx.spawn(tool, :thread)`, `ctx.spawn(tool, :fiber)`, or `ctx.spawn(tool, :task)`.
Called when queued streamed tool work returns.
110 111 112 |
# File 'lib/llm/stream.rb', line 110 def on_tool_return(tool, result) nil end |
#on_transform(ctx, transformer) ⇒ nil
Called before a context transformer rewrites a prompt.
119 120 121 |
# File 'lib/llm/stream.rb', line 119 def on_transform(ctx, transformer) nil end |
#on_transform_finish(ctx, transformer) ⇒ nil
Called after a context transformer finishes rewriting a prompt.
128 129 130 |
# File 'lib/llm/stream.rb', line 128 def on_transform_finish(ctx, transformer) nil end |
#queue ⇒ LLM::Stream::Queue
Returns a lazily-initialized queue for tool results or spawned work.
42 43 44 |
# File 'lib/llm/stream.rb', line 42 def queue @queue ||= Queue.new(self) end |
#wait ⇒ Array<LLM::Function::Return>
Waits for queued tool work to finish and returns function results. Any passed arguments are ignored because queued work is waited according to the actual task types already present in the queue.
51 52 53 |
# File 'lib/llm/stream.rb', line 51 def wait(*) queue.wait end |