Class: Brute::Orchestrator::Turn

Inherits:
Object
  • Object
show all
Defined in:
lib/brute/orchestrator/turn.rb

Instance Method Summary collapse

Constructor Details

#initialize(env:, pending:) ⇒ Turn

Returns a new instance of Turn.



4
5
6
7
# File 'lib/brute/orchestrator/turn.rb', line 4

def initialize(env:, pending:)
  @env = env
  @pending = pending
end

Instance Method Details

#errorsObject



30
# File 'lib/brute/orchestrator/turn.rb', line 30

def errors = @pending.select { |_, err| err }

#executableObject



31
# File 'lib/brute/orchestrator/turn.rb', line 31

def executable = @pending.reject { |_, err| err }.map(&:first)

#execute_parallel(functions) ⇒ Object

Run all pending tool calls concurrently via Async::Barrier.

Each tool runs in its own fiber. File-mutating tools are safe because they go through FileMutationQueue, whose Mutex is fiber-scheduler-aware in Ruby 3.4 — a fiber blocked on a per-file mutex yields to other fibers instead of blocking the thread.

The barrier is stored in @barrier so abort! can cancel in-flight tools.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/brute/orchestrator/turn.rb', line 79

def execute_parallel(functions)
  on_result = @env.dig(:callbacks, :on_tool_result)
  on_question = @env.dig(:callbacks, :on_question)

  Array.new(functions.size).tap do |results|
    Async do
      @barrier = Async::Barrier.new

      functions.each_with_index do |fn, i|
        @barrier.async do
          Thread.current[:on_question] = on_question
          results[i] = fn.call
          r = results[i]
          on_result&.call(r.name, result_value(r))
        end
      end

      @barrier.wait
    ensure
      @barrier&.stop
      @barrier = nil
    end
  end
end

#execute_sequential(functions) ⇒ Object

Run a single tool call synchronously.



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/brute/orchestrator/turn.rb', line 58

def execute_sequential(functions)
  on_result = @env.dig(:callbacks, :on_tool_result)
  on_question = @env.dig(:callbacks, :on_question)

  functions.map do |fn|
    Thread.current[:on_question] = on_question
    result = fn.call
    on_result&.call(fn.name, result_value(result))
    result
  end
end

#execute_tool_callsObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/brute/orchestrator/turn.rb', line 33

def execute_tool_calls
  if executable.empty?
    []
  else
    # Questions block execution — they must complete before other tools
    # run, since the LLM may need the answer to inform subsequent work.
    # Execute any question tools first (sequentially), then dispatch
    # the remaining tools concurrently.
    questions, others = executable.partition { _1.name == "question" }

    Array.new.tap do |results|
      if questions.any?
        results.concat(execute_sequential(questions))
      end

      if others.size <= 1
        results.concat(execute_sequential(others))
      else
        results.concat(execute_parallel(others))
      end
    end
  end
end

#performObject



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/brute/orchestrator/turn.rb', line 9

def perform
  @env.dig(:callbacks, :on_tool_call_start).then do |on_start|
    on_start&.call(
      @pending.map do |tool, _|
        {
          name: tool.name,
          arguments: tool.arguments
        }
      end
    )
  end

  execute_tool_calls.tap do |results|
    errors.each do |_, err|
      on_result = @env.dig(:callbacks, :on_tool_result)
      on_result&.call(err.name, result_value(err))
      results << err
    end
  end
end