Class: Brute::AgentStream

Inherits:
LLM::Stream
  • Object
show all
Defined in:
lib/brute/agent_stream.rb

Overview

Bridges llm.rb’s streaming callbacks to the host application.

Text and reasoning chunks fire immediately as the LLM generates them. Tool calls are collected but NOT executed — execution is deferred to the orchestrator after the stream completes. This ensures text is never concurrent with tool execution.

After the stream finishes, the orchestrator reads pending_tools to dispatch all tool calls concurrently, then fires on_tool_call_start once with the full batch.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(on_content: nil, on_reasoning: nil, on_question: nil) ⇒ AgentStream

Returns a new instance of AgentStream.



29
30
31
32
33
34
35
# File 'lib/brute/agent_stream.rb', line 29

def initialize(on_content: nil, on_reasoning: nil, on_question: nil)
  @on_content = on_content
  @on_reasoning = on_reasoning
  @on_question = on_question
  @pending_tool_calls = []
  @pending_tools = []
end

Instance Attribute Details

#on_questionObject (readonly)

The on_question callback, needed by the orchestrator to set thread/fiber-locals before tool execution.



39
40
41
# File 'lib/brute/agent_stream.rb', line 39

def on_question
  @on_question
end

#pending_tool_callsObject (readonly)

Tool call metadata recorded during streaming, used by ToolUseGuard when ctx.functions is empty (nil-choice bug in llm.rb).



23
24
25
# File 'lib/brute/agent_stream.rb', line 23

def pending_tool_calls
  @pending_tool_calls
end

#pending_toolsObject (readonly)

Deferred tool/error pairs: [(LLM::Function, error_or_nil), …] The orchestrator reads these after the stream completes.



27
28
29
# File 'lib/brute/agent_stream.rb', line 27

def pending_tools
  @pending_tools
end

Instance Method Details

#clear_pending_tool_calls!Object

Clear only the tool call metadata (used by ToolUseGuard after it has consumed the data for synthetic message injection).



58
59
60
# File 'lib/brute/agent_stream.rb', line 58

def clear_pending_tool_calls!
  @pending_tool_calls.clear
end

#clear_pending_tools!Object

Clear the deferred execution queue after the orchestrator has consumed and dispatched all tool calls.



64
65
66
# File 'lib/brute/agent_stream.rb', line 64

def clear_pending_tools!
  @pending_tools.clear
end

#on_content(text) ⇒ Object



41
42
43
# File 'lib/brute/agent_stream.rb', line 41

def on_content(text)
  @on_content&.call(text)
end

#on_reasoning_content(text) ⇒ Object



45
46
47
# File 'lib/brute/agent_stream.rb', line 45

def on_reasoning_content(text)
  @on_reasoning&.call(text)
end

#on_tool_call(tool, error) ⇒ Object

Called by llm.rb per tool as it arrives during streaming. Records only — no execution, no threads, no queue pushes.



51
52
53
54
# File 'lib/brute/agent_stream.rb', line 51

def on_tool_call(tool, error)
  @pending_tool_calls << { id: tool.id, name: tool.name, arguments: tool.arguments }
  @pending_tools << [tool, error]
end