Class: Phronomy::Agent::TeamCoordinator
- Inherits:
-
Object
- Object
- Phronomy::Agent::TeamCoordinator
- Defined in:
- lib/phronomy/agent/team_coordinator.rb
Overview
Implements the "Agent teams" coordination pattern (Anthropic blog, Pattern 3).
A coordinator LLM agent decomposes work into tasks and enqueues them dynamically via built-in tools. A fixed pool of worker agents claims tasks from the shared queue, carrying forward their conversation history across assignments to accumulate domain context over time.
The coordinator is an Base subclass that has two built-in tools:
- +enqueue_task+ — adds a task description to the queue
- +finalize+ — signals that all tasks have been enqueued
Worker persistence is implemented by passing each worker's accumulated +messages+ array back as a top-level +messages:+ argument on every subsequent +invoke+ call, so the LLM retains context across multiple task assignments.
Class Method Summary collapse
-
.aggregate {|Array<Hash>| ... } ⇒ Object
Defines how task assignments are merged into the final return value.
-
.coordinator_instructions(value = nil) ⇒ Object
Sets the system instructions for the coordinator agent.
-
.coordinator_model(value = nil) ⇒ Object
Sets the LLM model for the coordinator agent.
-
.coordinator_provider(value = nil) ⇒ Object
Sets the LLM provider for the coordinator agent.
-
.pool(size:, agent:, on_error: :raise) ⇒ Object
Configures the worker pool.
-
.schedule {|Array<WorkerState>| ... } ⇒ Object
Customises the worker selection algorithm.
Instance Method Summary collapse
-
#invoke(team_input, config: {}) ⇒ Object
Runs the full team coordination: coordinator generates tasks, workers process them sequentially, and the aggregate block merges the results.
-
#stream(team_input, config: {}) {|Hash| ... } ⇒ Object
Streaming version of +invoke+.
Class Method Details
.aggregate {|Array<Hash>| ... } ⇒ Object
Defines how task assignments are merged into the final return value. The block receives an Array of assignment Hashes: { task: Hash, result: String|nil, worker: Integer, error: Exception|nil } When omitted, the raw assignments array is returned.
111 112 113 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 111 def aggregate(&block) @aggregator = block end |
.coordinator_instructions(value = nil) ⇒ Object
Sets the system instructions for the coordinator agent. The prompt should direct the LLM to call +enqueue_task+ for each task and then call +finalize+ when all tasks are enqueued.
68 69 70 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 68 def coordinator_instructions(value = nil) value ? @coordinator_instructions = value : @coordinator_instructions end |
.coordinator_model(value = nil) ⇒ Object
Sets the LLM model for the coordinator agent. Falls back to +Phronomy.configuration.default_model+ when not set.
59 60 61 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 59 def coordinator_model(value = nil) value ? @coordinator_model = value : @coordinator_model end |
.coordinator_provider(value = nil) ⇒ Object
Sets the LLM provider for the coordinator agent. Required when using a custom +BASE_URL+ (e.g. LM Studio, Ollama, vLLM) so that RubyLLM does not attempt to resolve an unknown model name. Pass the same value as +LLMConfig::PROVIDER+ in your examples.
78 79 80 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 78 def coordinator_provider(value = nil) value ? @coordinator_provider = value : @coordinator_provider end |
.pool(size:, agent:, on_error: :raise) ⇒ Object
Configures the worker pool.
88 89 90 91 92 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 88 def pool(size:, agent:, on_error: :raise) @pool_size = Integer(size) @worker_agent = agent @on_error = on_error end |
.schedule {|Array<WorkerState>| ... } ⇒ Object
Customises the worker selection algorithm. The block receives an Array of available WorkerState objects and must return the one to assign the next task to. Default: worker with the fewest accumulated messages (round-robin-like).
101 102 103 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 101 def schedule(&block) @scheduler = block end |
Instance Method Details
#invoke(team_input, config: {}) ⇒ Object
Runs the full team coordination: coordinator generates tasks, workers process them sequentially, and the aggregate block merges the results.
140 141 142 143 144 145 146 147 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 140 def invoke(team_input, config: {}) raise ArgumentError, "pool :agent must be configured before invoking" unless self.class._worker_agent task_queue = [] run_coordinator(team_input, task_queue) assignments = run_workers(task_queue) finalize_result(assignments) end |
#stream(team_input, config: {}) {|Hash| ... } ⇒ Object
Streaming version of +invoke+. Yields a Hash event for each completed or failed task assignment.
Yielded Hash keys: :type — +:task_completed+ or +:task_failed+ :worker — worker index (Integer) :task — the task Hash from the queue ({ id:, description:, metadata:, enqueued_at: }) :result — output string, or +nil+ on failure :error — Exception, or +nil+ on success
164 165 166 167 168 169 170 171 172 173 |
# File 'lib/phronomy/agent/team_coordinator.rb', line 164 def stream(team_input, config: {}, &block) return invoke(team_input, config: config) unless block raise ArgumentError, "pool :agent must be configured before invoking" unless self.class._worker_agent task_queue = [] run_coordinator(team_input, task_queue) assignments = run_workers(task_queue, &block) finalize_result(assignments) end |