Class: Phronomy::Agent::Orchestrator

Inherits:
Base
  • Object
show all
Defined in:
lib/phronomy/agent/orchestrator.rb

Overview

Base class for orchestrator agents that coordinate multiple subagents. Implements the Orchestrator-Subagent multi-agent coordination pattern (Anthropic blog, Pattern 2).

Extends Base with:

  • A +subagent+ class-level DSL for declarative subagent registration. Each declared subagent is automatically exposed as an LLM-callable tool.
  • +dispatch_parallel+ for programmatic parallel invocation of heterogeneous agents.
  • +fan_out+ for parallel invocation of the same agent across multiple inputs.

Examples:

Declarative DSL

class ResearchOrchestrator < Phronomy::Agent::Orchestrator
  model "gpt-4o"
  instructions "You coordinate research tasks."
  subagent :searcher,   SearchAgent
  subagent :summarizer, SummaryAgent
end

result = ResearchOrchestrator.new.invoke("Research the latest AI news.")

Programmatic parallel dispatch

class MyOrchestrator < Phronomy::Agent::Orchestrator
  model "gpt-4o"
  instructions "Dispatch tasks in parallel."

  def run(input)
    results = dispatch_parallel(
      { agent: SearchAgent,   input: "topic A" },
      { agent: AnalysisAgent, input: input }
    )
    results.map { |r| r[:output] }.join("\n")
  end
end

Fan-out (same agent, multiple inputs)

results = fan_out(agent: TranslationAgent, inputs: ["Hello", "World"])

See Also:

Instance Attribute Summary

Attributes included from Concerns::BeforeCompletion

#before_completion

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#_add_handoff_tool, #_handoff_tools, _on_compact_callback, _on_compaction_trigger_callback, _on_trim_callback, cache_instructions, context_overhead, #context_version_cache, context_window, instructions, #invoke, max_iterations, max_output_tokens, model, on_compact, on_compaction_trigger, on_trim, provider, static_knowledge, static_knowledge_sources, #stream, temperature, tool_aliases, tools

Methods included from Concerns::Suspendable

#on_approval_required, #resume

Methods included from Concerns::BeforeCompletion

included

Methods included from Concerns::Guardrailable

#add_input_guardrail, #add_output_guardrail

Methods included from Concerns::Retryable

included

Methods included from Runnable

#batch, #invoke, #stream, #trace

Class Method Details

.registered_subagentsHash{Symbol => Hash}

Returns the subagent registry for this specific class (not inherited).

Returns:

  • (Hash{Symbol => Hash})


83
84
85
# File 'lib/phronomy/agent/orchestrator.rb', line 83

def self.registered_subagents
  @registered_subagents ||= {}
end

.subagent(name, agent_class, on_error: :raise) ⇒ Object

Declares a named subagent and registers it as a tool accessible to the LLM during an +invoke+ call.

Each call appends a new tool to this class's tool list. The generated tool's function name is +dispatch_to_+. When the LLM calls the tool, a fresh instance of +agent_class+ is created and +invoke+ is called with the provided input string.

Parameters:

  • name (Symbol)

    logical name that identifies the subagent

  • agent_class (Class)

    subclass of Base

  • on_error (Symbol) (defaults to: :raise)

    +:raise+ (default) re-raises any exception from the subagent; +:skip+ returns +nil+ so the LLM can decide how to proceed



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/phronomy/agent/orchestrator.rb', line 58

def self.subagent(name, agent_class, on_error: :raise)
  tool_class = Class.new(Phronomy::Tool::Base) do
    tool_name "dispatch_to_#{name}"
    description "Dispatch work to the #{name} subagent (#{agent_class.name})"
    param :input, type: :string, desc: "The task or question for the subagent"

    define_method(:execute) do |input:|
      result = agent_class.new.invoke(input)
      result[:output]
    rescue
      raise if on_error == :raise
      nil
    end
  end

  # Append without clobbering previously registered tools or aliases.
  @tools = (@tools || []) + [tool_class]
  @tool_aliases ||= {}

  registered_subagents[name] = {agent_class: agent_class, on_error: on_error}
end

Instance Method Details

#dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise) ⇒ Array<Hash, nil>

Dispatches multiple heterogeneous agent tasks in parallel using Ruby threads. Each task is a Hash describing one agent invocation.

Results are returned in the same order as the input +tasks+ array. Concurrency is bounded by +max_concurrency+; when nil all tasks run at once (original behaviour).

Error semantics are controlled by +on_error+:

  • +:raise+ (default) — every task runs to completion; the first exception in input order is then re-raised in the calling thread.
  • +:skip+ — failed tasks return +nil+; no exception is raised.

Parameters:

  • tasks (Array<Hash>)
  • max_concurrency (Integer, nil) (defaults to: nil)

    maximum number of concurrent threads; nil means no limit (all tasks run simultaneously)

  • on_error (Symbol) (defaults to: :raise)

    +:raise+ or +:skip+

  • task (Hash)

    a customizable set of options

Returns:

  • (Array<Hash, nil>)

    agent results in the same order as +tasks+

Raises:

  • (ArgumentError)

    if +on_error+ is not +:raise+ or +:skip+

  • (ArgumentError)

    if +max_concurrency+ is not a positive Integer or nil



109
110
111
112
113
114
115
116
117
118
# File 'lib/phronomy/agent/orchestrator.rb', line 109

def dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise)
  unless [:raise, :skip].include?(on_error)
    raise ArgumentError, "unknown on_error: #{on_error.inspect}"
  end
  if max_concurrency && !(max_concurrency.is_a?(Integer) && max_concurrency.positive?)
    raise ArgumentError, "max_concurrency must be a positive Integer"
  end

  bounded_map(tasks, max_concurrency: max_concurrency, on_error: on_error)
end

#fan_out(agent:, inputs:, config: {}, max_concurrency: nil, on_error: :raise) ⇒ Array<Hash, nil>

Runs the same agent against multiple inputs in parallel (fan-out pattern).

Accepts the same +max_concurrency:+ and +on_error:+ keyword arguments as #dispatch_parallel and forwards them unchanged.

Parameters:

  • agent (Class)

    agent class to invoke for every input

  • inputs (Array<String>)

    list of input strings

  • config (Hash) (defaults to: {})

    forwarded to every +agent#invoke+ call

  • max_concurrency (Integer, nil) (defaults to: nil)

    forwarded to #dispatch_parallel

  • on_error (Symbol) (defaults to: :raise)

    forwarded to #dispatch_parallel

Returns:

  • (Array<Hash, nil>)

    results in the same order as +inputs+



131
132
133
134
135
136
137
# File 'lib/phronomy/agent/orchestrator.rb', line 131

def fan_out(agent:, inputs:, config: {}, max_concurrency: nil, on_error: :raise)
  dispatch_parallel(
    *inputs.map { |input| {agent: agent, input: input, config: config} },
    max_concurrency: max_concurrency,
    on_error: on_error
  )
end