Class: Brute::Orchestrator

Inherits:
Object
  • Object
show all
Defined in:
lib/brute/orchestrator.rb

Overview

The core agent loop. Drives the cycle of:

prompt  LLM  tool calls  execute  send results  repeat

All cross-cutting concerns (retry, compaction, doom loop detection, token tracking, session persistence, tracing, reasoning) are implemented as Rack-style middleware in the Pipeline. The orchestrator is now a thin loop that:

1. Sends input through the pipeline (which wraps the LLM call)
2. Executes any tool calls the LLM requested
3. Repeats until done or a limit is hit

Tool execution is always deferred until after the LLM response (including streaming) completes. Tools then run concurrently with each other via Async::Barrier. on_tool_call_start fires once with the full batch before execution begins; on_tool_result fires per-tool as each finishes.

Constant Summary collapse

MAX_REQUESTS_PER_TURN =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider:, model: nil, tools: Brute::TOOLS, cwd: Dir.pwd, session: nil, compactor_opts: {}, reasoning: {}, agent_name: nil, on_content: nil, on_reasoning: nil, on_tool_call_start: nil, on_tool_result: nil, on_question: nil, logger: nil) ⇒ Orchestrator

Returns a new instance of Orchestrator.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/brute/orchestrator.rb', line 30

def initialize(
  provider:,
  model: nil,
  tools: Brute::TOOLS,
  cwd: Dir.pwd,
  session: nil,
  compactor_opts: {},
  reasoning: {},
  agent_name: nil,
  on_content: nil,
  on_reasoning: nil,
  on_tool_call_start: nil,
  on_tool_result: nil,
  on_question: nil,
  logger: nil
)
  @provider = provider
  @model = model
  @agent_name = agent_name
  @tool_classes = tools
  @cwd = cwd
  @session = session || Session.new
  @logger = logger || Logger.new($stderr, level: Logger::INFO)
  @message_store = @session.message_store

  # Build system prompt via deferred builder
  @system_prompt_builder = SystemPrompt.default
  @system_prompt = @system_prompt_builder.prepare(
    provider_name: @provider&.name,
    model_name: @model || @provider&.default_model,
    cwd: @cwd,
    custom_rules: load_custom_rules,
    agent: @agent_name,
  ).to_s

  # Initialize the LLM context (with streaming when callbacks provided)
  @stream = if on_content || on_reasoning
    AgentStream.new(
      on_content: on_content,
      on_reasoning: on_reasoning,
      on_question: on_question,
    )
  end
  ctx_opts = { tools: @tool_classes }
  ctx_opts[:model]  = @model  if @model
  ctx_opts[:stream] = @stream if @stream
  @context = LLM::Context.new(@provider, **ctx_opts)

  # Build the middleware pipeline
  compactor = Compactor.new(provider, **compactor_opts)
  @pipeline = build_pipeline(
    compactor: compactor,
    session: @session,
    logger: @logger,
    reasoning: reasoning,
    message_store: @message_store,
  )

  # The shared env hash — passed to every pipeline.call()
  @env = {
    context: @context,
    provider: @provider,
    tools: @tool_classes,
    input: nil,
    params: {},
    metadata: {},
    tool_results: nil,
    streaming: !!@stream,
    callbacks: {
      on_content: on_content,
      on_reasoning: on_reasoning,
      on_tool_call_start: on_tool_call_start,
      on_tool_result: on_tool_result,
      on_question: on_question,
    },
  }
end

Instance Attribute Details

#barrierObject (readonly)

Returns the value of attribute barrier.



28
29
30
# File 'lib/brute/orchestrator.rb', line 28

def barrier
  @barrier
end

#contextObject (readonly)

Returns the value of attribute context.



28
29
30
# File 'lib/brute/orchestrator.rb', line 28

def context
  @context
end

#envObject (readonly)

Returns the value of attribute env.



28
29
30
# File 'lib/brute/orchestrator.rb', line 28

def env
  @env
end

#message_storeObject (readonly)

Returns the value of attribute message_store.



28
29
30
# File 'lib/brute/orchestrator.rb', line 28

def message_store
  @message_store
end

#pipelineObject (readonly)

Returns the value of attribute pipeline.



28
29
30
# File 'lib/brute/orchestrator.rb', line 28

def pipeline
  @pipeline
end

#sessionObject (readonly)

Returns the value of attribute session.



28
29
30
# File 'lib/brute/orchestrator.rb', line 28

def session
  @session
end

Instance Method Details

#abort!Object

Cancel any in-flight tool execution. Safe to call from a signal handler, another thread, or an interface layer (TUI, web, RPC).

When called, Async::Stop is raised in each running fiber, unwinding through ensure blocks — so FileMutationQueue mutexes release cleanly and SnapshotStore stays consistent.



348
349
350
# File 'lib/brute/orchestrator.rb', line 348

def abort!
  @barrier&.stop
end

#run(user_message) ⇒ Object

Run a single user turn. Loops internally until the agent either completes (no more tool calls) or hits a limit.

Returns the final assistant response.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/brute/orchestrator.rb', line 112

def run(user_message)
  unless @provider
    raise "No LLM provider configured. Set LLM_API_KEY and optionally LLM_PROVIDER (default: opencode_zen)"
  end

  @request_count = 0

  # Build the initial prompt with system message on first turn
  input = if first_turn?
    @context.prompt do |p|
      p.system @system_prompt
      p.user user_message
    end
  else
    user_message
  end

  # --- First LLM call ---
  @env[:input] = input
  @env[:tool_results] = nil
  last_response = @pipeline.call(@env)
  sync_context!

  # --- Agent loop ---
  loop do
    # Collect pending tools from either source:
    # - Streaming: AgentStream deferred tools (collected during stream)
    # - Non-streaming: ctx.functions (populated by llm.rb after response)
    pending = collect_pending_tools
    break if pending.empty?

    # Fire on_tool_call_start ONCE with the full batch
    on_start = @env.dig(:callbacks, :on_tool_call_start)
    on_start&.call(pending.map { |tool, _| { name: tool.name, arguments: tool.arguments } })

    # Separate errors (tool not found) from executable tools
    errors = pending.select { |_, err| err }
    executable = pending.reject { |_, err| err }.map(&:first)

    # Execute tools concurrently, collect results
    results = execute_tool_calls(executable)

    # Append error results (tool not found, etc.)
    errors.each do |_, err|
      on_result = @env.dig(:callbacks, :on_tool_result)
      on_result&.call(err.name, result_value(err))
      results << err
    end

    # Send results back through the pipeline
    @env[:input] = results
    @env[:tool_results] = extract_tool_result_pairs(results)
    last_response = @pipeline.call(@env)
    sync_context!

    @request_count += 1

    # Check limits
    break if !has_pending_tools?
    break if @request_count >= MAX_REQUESTS_PER_TURN
    break if @env[:metadata][:tool_error_limit_reached]
  end

  last_response
end