Class: Brute::AgentStream
- Inherits:
-
LLM::Stream
- Object
- LLM::Stream
- Brute::AgentStream
- Defined in:
- lib/brute/agent_stream.rb
Overview
Bridges llm.rb’s streaming callbacks to the host application.
Text and reasoning chunks fire immediately as the LLM generates them. Tool calls are collected but NOT executed — execution is deferred to the orchestrator after the stream completes. This ensures text is never concurrent with tool execution.
After the stream finishes, the orchestrator reads pending_tools to dispatch all tool calls concurrently, then fires on_tool_call_start once with the full batch.
Instance Attribute Summary collapse
-
#on_question ⇒ Object
readonly
The on_question callback, needed by the orchestrator to set thread/fiber-locals before tool execution.
-
#pending_tool_calls ⇒ Object
readonly
Tool call metadata recorded during streaming, used by ToolUseGuard when ctx.functions is empty (nil-choice bug in llm.rb).
-
#pending_tools ⇒ Object
readonly
Deferred tool/error pairs: [(LLM::Function, error_or_nil), …] The orchestrator reads these after the stream completes.
Instance Method Summary collapse
-
#clear_pending_tool_calls! ⇒ Object
Clear only the tool call metadata (used by ToolUseGuard after it has consumed the data for synthetic message injection).
-
#clear_pending_tools! ⇒ Object
Clear the deferred execution queue after the orchestrator has consumed and dispatched all tool calls.
-
#initialize(on_content: nil, on_reasoning: nil, on_question: nil) ⇒ AgentStream
constructor
A new instance of AgentStream.
- #on_content(text) ⇒ Object
- #on_reasoning_content(text) ⇒ Object
-
#on_tool_call(tool, error) ⇒ Object
Called by llm.rb per tool as it arrives during streaming.
Constructor Details
#initialize(on_content: nil, on_reasoning: nil, on_question: nil) ⇒ AgentStream
Returns a new instance of AgentStream.
29 30 31 32 33 34 35 |
# File 'lib/brute/agent_stream.rb', line 29 def initialize(on_content: nil, on_reasoning: nil, on_question: nil) @on_content = on_content @on_reasoning = on_reasoning @on_question = on_question @pending_tool_calls = [] @pending_tools = [] end |
Instance Attribute Details
#on_question ⇒ Object (readonly)
The on_question callback, needed by the orchestrator to set thread/fiber-locals before tool execution.
39 40 41 |
# File 'lib/brute/agent_stream.rb', line 39 def on_question @on_question end |
#pending_tool_calls ⇒ Object (readonly)
Tool call metadata recorded during streaming, used by ToolUseGuard when ctx.functions is empty (nil-choice bug in llm.rb).
23 24 25 |
# File 'lib/brute/agent_stream.rb', line 23 def pending_tool_calls @pending_tool_calls end |
#pending_tools ⇒ Object (readonly)
Deferred tool/error pairs: [(LLM::Function, error_or_nil), …] The orchestrator reads these after the stream completes.
27 28 29 |
# File 'lib/brute/agent_stream.rb', line 27 def pending_tools @pending_tools end |
Instance Method Details
#clear_pending_tool_calls! ⇒ Object
Clear only the tool call metadata (used by ToolUseGuard after it has consumed the data for synthetic message injection).
58 59 60 |
# File 'lib/brute/agent_stream.rb', line 58 def clear_pending_tool_calls! @pending_tool_calls.clear end |
#clear_pending_tools! ⇒ Object
Clear the deferred execution queue after the orchestrator has consumed and dispatched all tool calls.
64 65 66 |
# File 'lib/brute/agent_stream.rb', line 64 def clear_pending_tools! @pending_tools.clear end |
#on_content(text) ⇒ Object
41 42 43 |
# File 'lib/brute/agent_stream.rb', line 41 def on_content(text) @on_content&.call(text) end |
#on_reasoning_content(text) ⇒ Object
45 46 47 |
# File 'lib/brute/agent_stream.rb', line 45 def on_reasoning_content(text) @on_reasoning&.call(text) end |
#on_tool_call(tool, error) ⇒ Object
Called by llm.rb per tool as it arrives during streaming. Records only — no execution, no threads, no queue pushes.
51 52 53 54 |
# File 'lib/brute/agent_stream.rb', line 51 def on_tool_call(tool, error) @pending_tool_calls << { id: tool.id, name: tool.name, arguments: tool.arguments } @pending_tools << [tool, error] end |