Class: LLM::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/llm/stream.rb,
lib/llm/stream/queue.rb

Overview

Note:

The ‘on_*` callbacks run inline with the streaming parser. They therefore block streaming progress and should generally return as quickly as possible.

The LLM::Stream class provides the callback interface for streamed model output in llm.rb.

A stream object can be an instance of LLM::Stream or a subclass that overrides the callbacks it needs. For basic streaming, llm.rb also accepts any object that implements ‘#<<`. #queue provides a small helper for collecting asynchronous tool work started from a callback, and #tool_not_found returns an in-band tool error when a streamed tool cannot be resolved.

The most common callback is #on_content, which also maps to #<<. Providers may also call #on_reasoning_content and #on_tool_call when that data is available. Runtime features such as context compaction may also emit lifecycle callbacks like #on_compaction.

Defined Under Namespace

Classes: Queue

Public callbacks collapse

Error handlers collapse

Instance Method Summary collapse

Instance Method Details

#ctxLLM::Context?

Returns the current context, if one was attached to the stream.

Returns:



36
37
38
# File 'lib/llm/stream.rb', line 36

def ctx
  extra[:ctx]
end

#extraHash

Returns extra context associated with the current streamed request.

Returns:

  • (Hash)


29
30
31
# File 'lib/llm/stream.rb', line 29

def extra
  @extra ||= LLM::Object.from({})
end

#find_tool(name) ⇒ LLM::Function?

Resolves a streamed tool call against the current request tools first, then falls back to the global function registry.

Parameters:

  • name (String)

Returns:



165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/llm/stream.rb', line 165

def find_tool(name)
  tool = tools.find do |candidate|
    candidate_name =
      if candidate.respond_to?(:function)
        candidate.function.name
      else
        candidate.name
      end
    candidate_name.to_s == name.to_s
  end
  tool&.then { _1.respond_to?(:function) ? _1.function : _1 } ||
    LLM::Function.find_by_name(name)
end

#on_compaction(ctx, compactor) ⇒ nil

Called before a context compaction starts.

Parameters:

Returns:

  • (nil)


120
121
122
# File 'lib/llm/stream.rb', line 120

def on_compaction(ctx, compactor)
  nil
end

#on_compaction_finish(ctx, compactor) ⇒ nil

Called after a context compaction finishes.

Parameters:

Returns:

  • (nil)


129
130
131
# File 'lib/llm/stream.rb', line 129

def on_compaction_finish(ctx, compactor)
  nil
end

#on_content(content) ⇒ nil Also known as: <<

Called when visible assistant output is streamed.

Parameters:

  • content (String)

    A chunk of assistant-visible text.

Returns:

  • (nil)


63
64
65
# File 'lib/llm/stream.rb', line 63

def on_content(content)
  nil
end

#on_reasoning_content(content) ⇒ nil

Called when reasoning output is streamed separately from visible content.

Parameters:

  • content (String)

    A chunk of reasoning text.

Returns:

  • (nil)


73
74
75
# File 'lib/llm/stream.rb', line 73

def on_reasoning_content(content)
  nil
end

#on_tool_call(tool, error) ⇒ nil

Note:

A stream implementation may start tool execution here, for example by pushing ‘ctx.spawn(tool, :thread)`, `ctx.spawn(tool, :fiber)`, or `ctx.spawn(tool, :task)` onto #queue. Mixed strategies can also be selected per tool, such as `tool.mcp? ? ctx.spawn(tool, :task) : ctx.spawn(tool, :ractor)`. When a streamed tool cannot be resolved, `error` is passed as an Function::Return. It can be sent back to the model, allowing the tool-call path to recover and the session to continue. Streamed tool resolution now prefers the current request tools, so LLM.function, MCP tools, bound tool instances, and normal LLM::Tool classes can all resolve through the same request-local path. The current `:ractor` mode is for class-based tools and does not support MCP tools.

Called when a streamed tool call has been fully constructed.

Parameters:

Returns:

  • (nil)


97
98
99
# File 'lib/llm/stream.rb', line 97

def on_tool_call(tool, error)
  nil
end

#on_tool_return(tool, result) ⇒ nil

Note:

This callback runs when #wait resolves work that was queued from #on_tool_call, such as values returned by ‘ctx.spawn(tool, :thread)`, `ctx.spawn(tool, :fiber)`, or `ctx.spawn(tool, :task)`.

Called when queued streamed tool work returns.

Parameters:

Returns:

  • (nil)


111
112
113
# File 'lib/llm/stream.rb', line 111

def on_tool_return(tool, result)
  nil
end

#queueLLM::Stream::Queue

Returns a lazily-initialized queue for tool results or spawned work.

Returns:



43
44
45
# File 'lib/llm/stream.rb', line 43

def queue
  @queue ||= Queue.new(self)
end

#tool_not_found(tool) ⇒ LLM::Function::Return

Note:

This is mainly useful as a fallback from #on_tool_call. It should be uncommon in normal use, since streamed tool callbacks only run for tools already defined in the context.

Returns a function return describing a streamed tool that could not be resolved.

Parameters:

Returns:



145
146
147
148
149
# File 'lib/llm/stream.rb', line 145

def tool_not_found(tool)
  LLM::Function::Return.new(tool.id, tool.name, {
    error: true, type: LLM::NoSuchToolError.name, message: "tool not found"
  })
end

#toolsArray<LLM::Function, LLM::Tool>

Returns the tool definitions available for the current streamed request. This prefers request-local tools attached to the stream and falls back to the current context defaults when present.

Returns:



156
157
158
# File 'lib/llm/stream.rb', line 156

def tools
  extra[:tools] || ctx&.params&.dig(:tools) || []
end

#wait(strategy) ⇒ Array<LLM::Function::Return>

Waits for queued tool work to finish and returns function results.

Parameters:

  • strategy (Symbol)

    The concurrency strategy to use

Returns:



52
53
54
# File 'lib/llm/stream.rb', line 52

def wait(strategy)
  queue.wait(strategy)
end