Class: LLM::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/llm/stream.rb,
lib/llm/stream/queue.rb

Overview

Note:

The ‘on_*` callbacks run inline with the streaming parser. They therefore block streaming progress and should generally return as quickly as possible.

The LLM::Stream class provides the callback interface for streamed model output in llm.rb.

A stream object can be an instance of LLM::Stream or a subclass that overrides the callbacks it needs. For basic streaming, llm.rb also accepts any object that implements ‘#<<`. #queue provides a small helper for collecting asynchronous tool work started from a callback, and #tool_not_found returns an in-band tool error when a streamed tool cannot be resolved.

The most common callback is #on_content, which also maps to #<<. Providers may also call #on_reasoning_content and #on_tool_call when that data is available.

Defined Under Namespace

Classes: Queue

Public callbacks collapse

Error handlers collapse

Instance Method Summary collapse

Instance Method Details

#extraHash

Returns extra context associated with the current streamed request.

Returns:

  • (Hash)


28
29
30
# File 'lib/llm/stream.rb', line 28

def extra
  @extra ||= LLM::Object.from({})
end

#on_content(content) ⇒ nil Also known as: <<

Called when visible assistant output is streamed.

Parameters:

  • content (String)

    A chunk of assistant-visible text.

Returns:

  • (nil)


55
56
57
# File 'lib/llm/stream.rb', line 55

def on_content(content)
  nil
end

#on_reasoning_content(content) ⇒ nil

Called when reasoning output is streamed separately from visible content.

Parameters:

  • content (String)

    A chunk of reasoning text.

Returns:

  • (nil)


65
66
67
# File 'lib/llm/stream.rb', line 65

def on_reasoning_content(content)
  nil
end

#on_tool_call(tool, error) ⇒ nil

Note:

A stream implementation may start tool execution here, for example by pushing ‘tool.spawn(:thread)`, `tool.spawn(:fiber)`, or `tool.spawn(:task)` onto #queue. Mixed strategies can also be selected per tool, such as `tool.mcp? ? tool.spawn(:task) : tool.spawn(:ractor)`. When a streamed tool cannot be resolved, `error` is passed as an Function::Return. It can be sent back to the model, allowing the tool-call path to recover and the session to continue. Tool resolution depends on Function.registry, which includes LLM::Tool subclasses, including MCP tools, but not functions defined with LLM.function. The current `:ractor` mode is for class-based tools and does not support MCP tools.

Called when a streamed tool call has been fully constructed.

Parameters:

Returns:

  • (nil)


88
89
90
# File 'lib/llm/stream.rb', line 88

def on_tool_call(tool, error)
  nil
end

#on_tool_return(tool, result) ⇒ nil

Note:

This callback runs when #wait resolves work that was queued from #on_tool_call, such as values returned by ‘tool.spawn(:thread)`, `tool.spawn(:fiber)`, or `tool.spawn(:task)`.

Called when queued streamed tool work returns.

Parameters:

Returns:

  • (nil)


102
103
104
# File 'lib/llm/stream.rb', line 102

def on_tool_return(tool, result)
  nil
end

#queueLLM::Stream::Queue

Returns a lazily-initialized queue for tool results or spawned work.

Returns:



35
36
37
# File 'lib/llm/stream.rb', line 35

def queue
  @queue ||= Queue.new(self)
end

#tool_not_found(tool) ⇒ LLM::Function::Return

Note:

This is mainly useful as a fallback from #on_tool_call. It should be uncommon in normal use, since streamed tool callbacks only run for tools already defined in the context.

Returns a function return describing a streamed tool that could not be resolved.

Parameters:

Returns:



118
119
120
121
122
# File 'lib/llm/stream.rb', line 118

def tool_not_found(tool)
  LLM::Function::Return.new(tool.id, tool.name, {
    error: true, type: LLM::NoSuchToolError.name, message: "tool not found"
  })
end

#wait(strategy) ⇒ Array<LLM::Function::Return>

Waits for queued tool work to finish and returns function results.

Parameters:

  • strategy (Symbol)

    The concurrency strategy to use

Returns:



44
45
46
# File 'lib/llm/stream.rb', line 44

def wait(strategy)
  queue.wait(strategy)
end