Class: Phronomy::Agent::TeamCoordinator
- Inherits:
-
Object
- Object
- Phronomy::Agent::TeamCoordinator
- Defined in:
- lib/phronomy/agent/team_coordinator.rb
Overview
Implements the "Agent teams" coordination pattern (Anthropic blog, Pattern 3).
A coordinator LLM agent decomposes work into tasks and enqueues them dynamically via built-in tools. A fixed set of worker agents processes tasks sequentially — one task per worker per turn — carrying forward their conversation history across assignments to accumulate domain context over time.
Workers are selected in sequence (the worker with the fewest accumulated messages is chosen by default). Task dispatch is synchronous; there is no concurrent or parallel execution.
The coordinator is an Base subclass that has two built-in tools:
- +enqueue_task+ — adds a task description to the queue
- +finalize+ — signals that all tasks have been enqueued
Worker persistence is implemented by passing each worker's accumulated +messages+ array back as a top-level +messages:+ argument on every subsequent +invoke+ call, so the LLM retains context across multiple task assignments.
Class Method Summary collapse
-
.aggregate {|Array<Hash>| ... } ⇒ Object
Defines how task assignments are merged into the final return value.
-
.coordinator_instructions(value = nil) ⇒ Object
Sets the system instructions for the coordinator agent.
-
.coordinator_model(value = nil) ⇒ Object
Sets the LLM model for the coordinator agent.
-
.coordinator_provider(value = nil) ⇒ Object
Sets the LLM provider for the coordinator agent.
-
.pool(size:, agent:, on_error: :raise) ⇒ Object
Configures the set of workers.
-
.schedule {|Array<WorkerState>| ... } ⇒ Object
Customises the worker selection algorithm.
Instance Method Summary collapse
-
#invoke(team_input, config: {}) ⇒ Object
Runs the full team coordination: coordinator generates tasks, workers process them sequentially, and the aggregate block merges the results.
-
#stream(team_input, config: {}) {|Hash| ... } ⇒ Object
Streaming version of +invoke+.
Class Method Details
.aggregate {|Array<Hash>| ... } ⇒ Object
Defines how task assignments are merged into the final return value. The block receives an Array of assignment Hashes: { task: Hash, result: String|nil, worker: Integer, error: Exception|nil } When omitted, the raw assignments array is returned.
121 122 123 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 121 def aggregate(&block) @aggregator = block end |
.coordinator_instructions(value = nil) ⇒ Object
Sets the system instructions for the coordinator agent. The prompt should direct the LLM to call +enqueue_task+ for each task and then call +finalize+ when all tasks are enqueued.
74 75 76 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 74 def coordinator_instructions(value = nil) value ? @coordinator_instructions = value : @coordinator_instructions end |
.coordinator_model(value = nil) ⇒ Object
Sets the LLM model for the coordinator agent. Falls back to +Phronomy.configuration.default_model+ when not set.
64 65 66 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 64 def coordinator_model(value = nil) value ? @coordinator_model = value : @coordinator_model end |
.coordinator_provider(value = nil) ⇒ Object
Sets the LLM provider for the coordinator agent. Required when using a custom +BASE_URL+ (e.g. LM Studio, Ollama, vLLM) so that RubyLLM does not attempt to resolve an unknown model name. Pass the same value as +LLMConfig::PROVIDER+ in your examples.
85 86 87 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 85 def coordinator_provider(value = nil) value ? @coordinator_provider = value : @coordinator_provider end |
.pool(size:, agent:, on_error: :raise) ⇒ Object
Configures the set of workers.
96 97 98 99 100 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 96 def pool(size:, agent:, on_error: :raise) @pool_size = Integer(size) @worker_agent = agent @on_error = on_error end |
.schedule {|Array<WorkerState>| ... } ⇒ Object
Customises the worker selection algorithm. The block receives an Array of available WorkerState objects and must return the one to assign the next task to. Default: worker with the fewest accumulated messages (round-robin-like).
110 111 112 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 110 def schedule(&block) @scheduler = block end |
Instance Method Details
#invoke(team_input, config: {}) ⇒ Object
Runs the full team coordination: coordinator generates tasks, workers process them sequentially, and the aggregate block merges the results.
151 152 153 154 155 156 157 158 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 151 def invoke(team_input, config: {}) raise ArgumentError, "pool :agent must be configured before invoking" unless self.class._worker_agent task_queue = [] run_coordinator(team_input, task_queue) assignments = run_workers(task_queue) finalize_result(assignments) end |
#stream(team_input, config: {}) {|Hash| ... } ⇒ Object
Streaming version of +invoke+. Yields a Hash event for each completed or failed task assignment.
Yielded Hash keys: :type — +:task_completed+ or +:task_failed+ :worker — worker index (Integer) :task — the task Hash from the queue ({ id:, description:, metadata:, enqueued_at: }) :result — output string, or +nil+ on failure :error — Exception, or +nil+ on success
176 177 178 179 180 181 182 183 184 185 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 176 def stream(team_input, config: {}, &block) return invoke(team_input, config: config) unless block raise ArgumentError, "pool :agent must be configured before invoking" unless self.class._worker_agent task_queue = [] run_coordinator(team_input, task_queue) assignments = run_workers(task_queue, &block) finalize_result(assignments) end |