Class: Phronomy::Agent::TeamCoordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/agent/team_coordinator.rb

Overview

Implements the "Agent teams" coordination pattern (Anthropic blog, Pattern 3).

A coordinator LLM agent decomposes work into tasks and enqueues them dynamically via built-in tools. A fixed pool of worker agents claims tasks from the shared queue, carrying forward their conversation history across assignments to accumulate domain context over time.

The coordinator is an Base subclass that has two built-in tools:

  • +enqueue_task+ — adds a task description to the queue
  • +finalize+ — signals that all tasks have been enqueued

Worker persistence is implemented by passing each worker's accumulated +messages+ array back as a top-level +messages:+ argument on every subsequent +invoke+ call, so the LLM retains context across multiple task assignments.

Examples:

Basic usage

class MigrationTeam < Phronomy::Agent::TeamCoordinator
  coordinator_model        "claude-3-5-sonnet-20241022"
  coordinator_instructions <<~INST
    Analyze the request and enqueue one migration task per service.
    Call enqueue_task for each service, then call finalize.
  INST

  pool size: 3, agent: MigrationAgent

  aggregate do |assignments|
    { reports: assignments.map { |a| { task: a[:task][:description], result: a[:result] } } }
  end
end

result = MigrationTeam.new.invoke("Migrate all services to Rails 8")

See Also:

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.aggregate {|Array<Hash>| ... } ⇒ Object

Defines how task assignments are merged into the final return value. The block receives an Array of assignment Hashes: { task: Hash, result: String|nil, worker: Integer, error: Exception|nil } When omitted, the raw assignments array is returned.

Yields:

  • (Array<Hash>)

    all completed (and skipped) task assignments



111
112
113
# File 'lib/phronomy/agent/team_coordinator.rb', line 111

def aggregate(&block)
  @aggregator = block
end

.coordinator_instructions(value = nil) ⇒ Object

Sets the system instructions for the coordinator agent. The prompt should direct the LLM to call +enqueue_task+ for each task and then call +finalize+ when all tasks are enqueued.

Parameters:

  • value (String, nil) (defaults to: nil)


68
69
70
# File 'lib/phronomy/agent/team_coordinator.rb', line 68

def coordinator_instructions(value = nil)
  value ? @coordinator_instructions = value : @coordinator_instructions
end

.coordinator_model(value = nil) ⇒ Object

Sets the LLM model for the coordinator agent. Falls back to +Phronomy.configuration.default_model+ when not set.

Parameters:

  • value (String, nil) (defaults to: nil)


59
60
61
# File 'lib/phronomy/agent/team_coordinator.rb', line 59

def coordinator_model(value = nil)
  value ? @coordinator_model = value : @coordinator_model
end

.coordinator_provider(value = nil) ⇒ Object

Sets the LLM provider for the coordinator agent. Required when using a custom +BASE_URL+ (e.g. LM Studio, Ollama, vLLM) so that RubyLLM does not attempt to resolve an unknown model name. Pass the same value as +LLMConfig::PROVIDER+ in your examples.

Parameters:

  • value (Symbol, nil) (defaults to: nil)


78
79
80
# File 'lib/phronomy/agent/team_coordinator.rb', line 78

def coordinator_provider(value = nil)
  value ? @coordinator_provider = value : @coordinator_provider
end

.pool(size:, agent:, on_error: :raise) ⇒ Object

Configures the worker pool.

Parameters:

  • size (Integer)

    number of persistent worker instances

  • agent (Class)

    Agent::Base subclass used for all workers

  • on_error (Symbol) (defaults to: :raise)

    +:raise+ (default) propagates worker exceptions; +:skip+ records the failure and continues with remaining tasks



88
89
90
91
92
# File 'lib/phronomy/agent/team_coordinator.rb', line 88

def pool(size:, agent:, on_error: :raise)
  @pool_size = Integer(size)
  @worker_agent = agent
  @on_error = on_error
end

.schedule {|Array<WorkerState>| ... } ⇒ Object

Customises the worker selection algorithm. The block receives an Array of available WorkerState objects and must return the one to assign the next task to. Default: worker with the fewest accumulated messages (round-robin-like).

Yields:

  • (Array<WorkerState>)

    available workers

Yield Returns:

  • (WorkerState)

    the chosen worker



101
102
103
# File 'lib/phronomy/agent/team_coordinator.rb', line 101

def schedule(&block)
  @scheduler = block
end

Instance Method Details

#invoke(team_input, config: {}) ⇒ Object

Runs the full team coordination: coordinator generates tasks, workers process them sequentially, and the aggregate block merges the results.

Parameters:

  • team_input (String, Hash)

    the high-level objective given to the coordinator

  • config (Hash) (defaults to: {})

    reserved for future use

Returns:

  • (Object)

    the return value of the aggregate block, or the raw assignments Array

Raises:

  • (ArgumentError)

    when +pool :agent+ has not been configured



140
141
142
143
144
145
146
147
# File 'lib/phronomy/agent/team_coordinator.rb', line 140

def invoke(team_input, config: {})
  raise ArgumentError, "pool :agent must be configured before invoking" unless self.class._worker_agent

  task_queue = []
  run_coordinator(team_input, task_queue)
  assignments = run_workers(task_queue)
  finalize_result(assignments)
end

#stream(team_input, config: {}) {|Hash| ... } ⇒ Object

Streaming version of +invoke+. Yields a Hash event for each completed or failed task assignment.

Yielded Hash keys: :type — +:task_completed+ or +:task_failed+ :worker — worker index (Integer) :task — the task Hash from the queue ({ id:, description:, metadata:, enqueued_at: }) :result — output string, or +nil+ on failure :error — Exception, or +nil+ on success

Parameters:

  • team_input (String, Hash)
  • config (Hash) (defaults to: {})

Yields:

  • (Hash)

    one event per completed/failed task

Returns:

  • (Object)

    same as +invoke+

Raises:

  • (ArgumentError)

    when +pool :agent+ has not been configured



164
165
166
167
168
169
170
171
172
173
# File 'lib/phronomy/agent/team_coordinator.rb', line 164

def stream(team_input, config: {}, &block)
  return invoke(team_input, config: config) unless block

  raise ArgumentError, "pool :agent must be configured before invoking" unless self.class._worker_agent

  task_queue = []
  run_coordinator(team_input, task_queue)
  assignments = run_workers(task_queue, &block)
  finalize_result(assignments)
end