Class: Brute::Loop::AgentTurn::Base

Inherits:
Step
  • Object
show all
Defined in:
lib/brute/loop/agent_turn.rb

Overview

The default implementation. Works for any provider. Provider-specific subclasses override supported_messages and anything else that differs.

LLM::Context is built fresh for each pipeline call by the LLMCall middleware. The agent turn owns the conversation state via env (an Array<LLM::Message>).

Supports two modes:

Non-streaming (default): text arrives after the LLM call completes,
on_content fires post-hoc via LLMCall middleware, tool calls come
from env[:pending_functions].

Streaming: enabled when on_content or on_reasoning callbacks are
present. Text/reasoning fire incrementally via AgentStream. Tool
calls are deferred during the stream and collected afterward from
the stream's pending_tools.

Callbacks:

on_content:         ->(text) {}     # text chunk (streaming) or full text (non-streaming)
on_reasoning:       ->(text) {}     # reasoning/thinking chunk (streaming only)
on_tool_call_start: ->(batch) {}    # [{name:, arguments:}, ...] before tool execution
on_tool_result:     ->(name, r) {}  # per-tool, after each completes
on_question:        ->(questions, queue) {}  # interactive; push answers onto queue

Direct Known Subclasses

Anthropic, Google, OpenAI

Constant Summary collapse

MAX_ITERATIONS =
100

Constants inherited from Step

Step::STATES

Instance Attribute Summary collapse

Attributes inherited from Step

#id

Instance Method Summary collapse

Methods inherited from Step

#call, #cancel, #error, generate_id, #jobs, #result, #state, #status

Constructor Details

#initialize(agent:, session:, pipeline:, input: nil, callbacks: {}, **rest) ⇒ Base

Returns a new instance of Base.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/brute/loop/agent_turn.rb', line 94

def initialize(agent:, session:, pipeline:, input: nil, callbacks: {}, **rest)
  super(**rest)
  @agent     = agent
  @session   = session
  @pipeline  = pipeline
  @input     = input
  @callbacks = callbacks

  # Create streaming bridge when content or reasoning callbacks are
  # present. The stream is passed into env so LLMCall can wire it
  # into each fresh LLM::Context.
  if @callbacks[:on_content] || @callbacks[:on_reasoning]
    @stream = AgentStream.new(
      on_content:   @callbacks[:on_content],
      on_reasoning: @callbacks[:on_reasoning],
      on_question:  @callbacks[:on_question],
    )
  end
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



92
93
94
# File 'lib/brute/loop/agent_turn.rb', line 92

def agent
  @agent
end

#sessionObject (readonly)

Returns the value of attribute session.



92
93
94
# File 'lib/brute/loop/agent_turn.rb', line 92

def session
  @session
end

Instance Method Details

#perform(task) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/brute/loop/agent_turn.rb', line 114

def perform(task)
  env = build_env

  # First LLM call
  env[:input] = build_initial_input(@input)
  env[:tool_results] = nil
  response = @pipeline.call(env)

  iterations = 0
  while !env[:should_exit] &&
    (pending = collect_pending_tools(env)).any? &&
    iterations < MAX_ITERATIONS

    # Fire on_tool_call_start with the full batch
    @callbacks[:on_tool_call_start]&.call(
      pending.map { |fn, _| { name: fn.name, arguments: fn.arguments } }
    )

    # Partition: question tools run sequentially on this fiber,
    # all others run in parallel via the sub-queue.
    questions, others = pending.partition { |fn, _| fn.name == "question" }

    results = []

    # Questions first — sequential, blocking, with on_question fiber-local
    questions.each do |fn, err|
      if err
        @callbacks[:on_tool_result]&.call(err.name, result_value(err))
        results << err
      else
        Thread.current[:on_question] = @callbacks[:on_question]
        result = fn.call
        @callbacks[:on_tool_result]&.call(fn.name, result_value(result))
        results << result
      end
    end

    # Others — into the parallel queue
    if others.any?
      errors, executable = others.partition { |_, err| err }

      # Record pre-existing errors (from stream's on_tool_call)
      errors.each do |_, err|
        @callbacks[:on_tool_result]&.call(err.name, result_value(err))
        results << err
      end

      if executable.any?
        tool_steps = executable.map { |fn, _| ToolCallStep.new(function: fn) }
        tool_steps.each { |s| jobs(type: Brute::Queue::ParallelQueue) << s }
        jobs.drain

        tool_steps.each do |s|
          val = s.state == :completed ? s.result : s.error
          @callbacks[:on_tool_result]&.call(s.function.name, result_value(val))
          results << val
        end
      end
    end

    # Feed results back to LLM
    env[:input] = results
    env[:tool_results] = results.filter_map { |r|
      name = r.respond_to?(:name) ? r.name : "unknown"
      [name, result_value(r)]
    }
    response = @pipeline.call(env)

    # Re-create sub-queue for next iteration's tool calls
    @mutex.synchronize { @jobs = nil }
    iterations += 1
  end

  response
end

#supported_messages(messages) ⇒ Object

Override in subclasses to filter message types per provider. Default: all messages pass through.



192
193
194
# File 'lib/brute/loop/agent_turn.rb', line 192

def supported_messages(messages)
  messages
end