Class: Brute::Orchestrator
- Inherits:
-
Object
- Object
- Brute::Orchestrator
- Defined in:
- lib/brute/orchestrator.rb
Overview
The core agent loop. Drives the cycle of:
prompt → LLM → tool calls → execute → send results → repeat
All cross-cutting concerns (retry, compaction, doom loop detection, token tracking, session persistence, tracing, reasoning) are implemented as Rack-style middleware in the Pipeline. The orchestrator is now a thin loop that:
1. Sends input through the pipeline (which wraps the LLM call)
2. Executes any tool calls the LLM requested
3. Repeats until done or a limit is hit
Tool execution is always deferred until after the LLM response (including streaming) completes. Tools then run concurrently with each other via Async::Barrier. on_tool_call_start fires once with the full batch before execution begins; on_tool_result fires per-tool as each finishes.
Constant Summary collapse
- MAX_REQUESTS_PER_TURN =
100
Instance Attribute Summary collapse
-
#barrier ⇒ Object
readonly
Returns the value of attribute barrier.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#env ⇒ Object
readonly
Returns the value of attribute env.
-
#message_store ⇒ Object
readonly
Returns the value of attribute message_store.
-
#pipeline ⇒ Object
readonly
Returns the value of attribute pipeline.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Instance Method Summary collapse
-
#abort! ⇒ Object
Cancel any in-flight tool execution.
-
#initialize(provider:, model: nil, tools: Brute::TOOLS, cwd: Dir.pwd, session: nil, compactor_opts: {}, reasoning: {}, agent_name: nil, on_content: nil, on_reasoning: nil, on_tool_call_start: nil, on_tool_result: nil, on_question: nil, logger: nil) ⇒ Orchestrator
constructor
A new instance of Orchestrator.
-
#run(user_message) ⇒ Object
Run a single user turn.
Constructor Details
#initialize(provider:, model: nil, tools: Brute::TOOLS, cwd: Dir.pwd, session: nil, compactor_opts: {}, reasoning: {}, agent_name: nil, on_content: nil, on_reasoning: nil, on_tool_call_start: nil, on_tool_result: nil, on_question: nil, logger: nil) ⇒ Orchestrator
Returns a new instance of Orchestrator.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/brute/orchestrator.rb', line 30 def initialize( provider:, model: nil, tools: Brute::TOOLS, cwd: Dir.pwd, session: nil, compactor_opts: {}, reasoning: {}, agent_name: nil, on_content: nil, on_reasoning: nil, on_tool_call_start: nil, on_tool_result: nil, on_question: nil, logger: nil ) @provider = provider @model = model @agent_name = agent_name @tool_classes = tools @cwd = cwd @session = session || Session.new @logger = logger || Logger.new($stderr, level: Logger::INFO) @message_store = @session. # Build system prompt via deferred builder @system_prompt_builder = SystemPrompt.default @system_prompt = @system_prompt_builder.prepare( provider_name: @provider&.name, model_name: @model || @provider&.default_model, cwd: @cwd, custom_rules: load_custom_rules, agent: @agent_name, ).to_s # Initialize the LLM context (with streaming when callbacks provided) @stream = if on_content || on_reasoning AgentStream.new( on_content: on_content, on_reasoning: on_reasoning, on_question: on_question, ) end ctx_opts = { tools: @tool_classes } ctx_opts[:model] = @model if @model ctx_opts[:stream] = @stream if @stream @context = LLM::Context.new(@provider, **ctx_opts) # Build the middleware pipeline compactor = Compactor.new(provider, **compactor_opts) @pipeline = build_pipeline( compactor: compactor, session: @session, logger: @logger, reasoning: reasoning, message_store: @message_store, ) # The shared env hash — passed to every pipeline.call() @env = { context: @context, provider: @provider, tools: @tool_classes, input: nil, params: {}, metadata: {}, tool_results: nil, streaming: !!@stream, callbacks: { on_content: on_content, on_reasoning: on_reasoning, on_tool_call_start: on_tool_call_start, on_tool_result: on_tool_result, on_question: on_question, }, } end |
Instance Attribute Details
#barrier ⇒ Object (readonly)
Returns the value of attribute barrier.
28 29 30 |
# File 'lib/brute/orchestrator.rb', line 28 def @barrier end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
28 29 30 |
# File 'lib/brute/orchestrator.rb', line 28 def context @context end |
#env ⇒ Object (readonly)
Returns the value of attribute env.
28 29 30 |
# File 'lib/brute/orchestrator.rb', line 28 def env @env end |
#message_store ⇒ Object (readonly)
Returns the value of attribute message_store.
28 29 30 |
# File 'lib/brute/orchestrator.rb', line 28 def @message_store end |
#pipeline ⇒ Object (readonly)
Returns the value of attribute pipeline.
28 29 30 |
# File 'lib/brute/orchestrator.rb', line 28 def pipeline @pipeline end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
28 29 30 |
# File 'lib/brute/orchestrator.rb', line 28 def session @session end |
Instance Method Details
#abort! ⇒ Object
Cancel any in-flight tool execution. Safe to call from a signal handler, another thread, or an interface layer (TUI, web, RPC).
When called, Async::Stop is raised in each running fiber, unwinding through ensure blocks — so FileMutationQueue mutexes release cleanly and SnapshotStore stays consistent.
348 349 350 |
# File 'lib/brute/orchestrator.rb', line 348 def abort! @barrier&.stop end |
#run(user_message) ⇒ Object
Run a single user turn. Loops internally until the agent either completes (no more tool calls) or hits a limit.
Returns the final assistant response.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/brute/orchestrator.rb', line 112 def run() unless @provider raise "No LLM provider configured. Set LLM_API_KEY and optionally LLM_PROVIDER (default: opencode_zen)" end @request_count = 0 # Build the initial prompt with system message on first turn input = if first_turn? @context.prompt do |p| p.system @system_prompt p.user end else end # --- First LLM call --- @env[:input] = input @env[:tool_results] = nil last_response = @pipeline.call(@env) sync_context! # --- Agent loop --- loop do # Collect pending tools from either source: # - Streaming: AgentStream deferred tools (collected during stream) # - Non-streaming: ctx.functions (populated by llm.rb after response) pending = collect_pending_tools break if pending.empty? # Fire on_tool_call_start ONCE with the full batch on_start = @env.dig(:callbacks, :on_tool_call_start) on_start&.call(pending.map { |tool, _| { name: tool.name, arguments: tool.arguments } }) # Separate errors (tool not found) from executable tools errors = pending.select { |_, err| err } executable = pending.reject { |_, err| err }.map(&:first) # Execute tools concurrently, collect results results = execute_tool_calls(executable) # Append error results (tool not found, etc.) errors.each do |_, err| on_result = @env.dig(:callbacks, :on_tool_result) on_result&.call(err.name, result_value(err)) results << err end # Send results back through the pipeline @env[:input] = results @env[:tool_results] = extract_tool_result_pairs(results) last_response = @pipeline.call(@env) sync_context! @request_count += 1 # Check limits break if !has_pending_tools? break if @request_count >= MAX_REQUESTS_PER_TURN break if @env[:metadata][:tool_error_limit_reached] end last_response end |