Class: Brute::Loop::AgentTurn::Base
- Defined in:
- lib/brute/loop/agent_turn.rb
Overview
The default implementation. Works for any provider. Provider-specific subclasses override supported_messages and anything else that differs.
LLM::Context is built fresh for each pipeline call by the LLMCall middleware. The agent turn owns the conversation state via env (an Array<LLM::Message>).
Supports two modes:
Non-streaming (default): text arrives after the LLM call completes,
on_content fires post-hoc via LLMCall middleware, tool calls come
from env[:pending_functions].
Streaming: enabled when on_content or on_reasoning callbacks are
present. Text/reasoning fire incrementally via AgentStream. Tool
calls are deferred during the stream and collected afterward from
the stream's pending_tools.
Callbacks:
on_content: ->(text) {} # text chunk (streaming) or full text (non-streaming)
on_reasoning: ->(text) {} # reasoning/thinking chunk (streaming only)
on_tool_call_start: ->(batch) {} # [{name:, arguments:}, ...] before tool execution
on_tool_result: ->(name, r) {} # per-tool, after each completes
on_question: ->(questions, queue) {} # interactive; push answers onto queue
Constant Summary collapse
- MAX_ITERATIONS =
100
Constants inherited from Step
Instance Attribute Summary collapse
-
#agent ⇒ Object
readonly
Returns the value of attribute agent.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Attributes inherited from Step
Instance Method Summary collapse
-
#initialize(agent:, session:, pipeline:, input: nil, callbacks: {}, **rest) ⇒ Base
constructor
A new instance of Base.
- #perform(task) ⇒ Object
-
#supported_messages(messages) ⇒ Object
Override in subclasses to filter message types per provider.
Methods inherited from Step
#call, #cancel, #error, generate_id, #jobs, #result, #state, #status
Constructor Details
#initialize(agent:, session:, pipeline:, input: nil, callbacks: {}, **rest) ⇒ Base
Returns a new instance of Base.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/brute/loop/agent_turn.rb', line 94 def initialize(agent:, session:, pipeline:, input: nil, callbacks: {}, **rest) super(**rest) @agent = agent @session = session @pipeline = pipeline @input = input @callbacks = callbacks # Create streaming bridge when content or reasoning callbacks are # present. The stream is passed into env so LLMCall can wire it # into each fresh LLM::Context. if @callbacks[:on_content] || @callbacks[:on_reasoning] @stream = AgentStream.new( on_content: @callbacks[:on_content], on_reasoning: @callbacks[:on_reasoning], on_question: @callbacks[:on_question], ) end end |
Instance Attribute Details
#agent ⇒ Object (readonly)
Returns the value of attribute agent.
92 93 94 |
# File 'lib/brute/loop/agent_turn.rb', line 92 def agent @agent end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
92 93 94 |
# File 'lib/brute/loop/agent_turn.rb', line 92 def session @session end |
Instance Method Details
#perform(task) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/brute/loop/agent_turn.rb', line 114 def perform(task) env = build_env # First LLM call env[:input] = build_initial_input(@input) env[:tool_results] = nil response = @pipeline.call(env) iterations = 0 while !env[:should_exit] && (pending = collect_pending_tools(env)).any? && iterations < MAX_ITERATIONS # Fire on_tool_call_start with the full batch @callbacks[:on_tool_call_start]&.call( pending.map { |fn, _| { name: fn.name, arguments: fn.arguments } } ) # Partition: question tools run sequentially on this fiber, # all others run in parallel via the sub-queue. questions, others = pending.partition { |fn, _| fn.name == "question" } results = [] # Questions first — sequential, blocking, with on_question fiber-local questions.each do |fn, err| if err @callbacks[:on_tool_result]&.call(err.name, result_value(err)) results << err else Thread.current[:on_question] = @callbacks[:on_question] result = fn.call @callbacks[:on_tool_result]&.call(fn.name, result_value(result)) results << result end end # Others — into the parallel queue if others.any? errors, executable = others.partition { |_, err| err } # Record pre-existing errors (from stream's on_tool_call) errors.each do |_, err| @callbacks[:on_tool_result]&.call(err.name, result_value(err)) results << err end if executable.any? tool_steps = executable.map { |fn, _| ToolCallStep.new(function: fn) } tool_steps.each { |s| jobs(type: Brute::Queue::ParallelQueue) << s } jobs.drain tool_steps.each do |s| val = s.state == :completed ? s.result : s.error @callbacks[:on_tool_result]&.call(s.function.name, result_value(val)) results << val end end end # Feed results back to LLM env[:input] = results env[:tool_results] = results.filter_map { |r| name = r.respond_to?(:name) ? r.name : "unknown" [name, result_value(r)] } response = @pipeline.call(env) # Re-create sub-queue for next iteration's tool calls @mutex.synchronize { @jobs = nil } iterations += 1 end response end |
#supported_messages(messages) ⇒ Object
Override in subclasses to filter message types per provider. Default: all messages pass through.
192 193 194 |
# File 'lib/brute/loop/agent_turn.rb', line 192 def () end |