Class: Phronomy::Agent::Orchestrator

Inherits:
Base
  • Object
show all
Defined in:
lib/phronomy/agent/orchestrator.rb

Overview

Base class for orchestrator agents that coordinate multiple subagents. Implements the Orchestrator-Subagent multi-agent coordination pattern (Anthropic blog, Pattern 2).

Extends Base with:

  • A +subagent+ class-level DSL for declarative subagent registration. Each declared subagent is automatically exposed as an LLM-callable tool.
  • +dispatch_parallel+ for programmatic parallel invocation of heterogeneous agents.
  • +fan_out+ for parallel invocation of the same agent across multiple inputs.

Examples:

Declarative DSL

class ResearchOrchestrator < Phronomy::Agent::Orchestrator
  model "gpt-4o"
  instructions "You coordinate research tasks."
  subagent :searcher,   SearchAgent
  subagent :summarizer, SummaryAgent
end

result = ResearchOrchestrator.new.invoke("Research the latest AI news.")

Programmatic parallel dispatch

class MyOrchestrator < Phronomy::Agent::Orchestrator
  model "gpt-4o"
  instructions "Dispatch tasks in parallel."

  def run(input)
    results = dispatch_parallel(
      { agent: SearchAgent,   input: "topic A" },
      { agent: AnalysisAgent, input: input }
    )
    results.map { |r| r[:output] }.join("\n")
  end
end

Fan-out (same agent, multiple inputs)

results = fan_out(agent: TranslationAgent, inputs: ["Hello", "World"])

See Also:

Instance Attribute Summary

Attributes included from Concerns::BeforeCompletion

#before_completion

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#_add_handoff_tool, #_handoff_tools, _on_compact_callback, _on_compaction_trigger_callback, _on_trim_callback, cache_instructions, context_overhead, #context_version_cache, context_window, instructions, #invoke, invoke_timeout, max_iterations, max_output_tokens, max_parallel_tools, model, on_compact, on_compaction_trigger, on_trim, provider, #run_as_child, static_knowledge, static_knowledge_chunks, static_knowledge_refresh!, static_knowledge_sources, #stream, temperature, tool_aliases, tools

Methods included from Concerns::Suspendable

#on_approval_required, #resume

Methods included from Concerns::BeforeCompletion

included

Methods included from Concerns::Guardrailable

#add_input_guardrail, #add_output_guardrail

Methods included from Concerns::Retryable

included

Methods included from Runnable

#batch, #invoke, #stream, #trace

Class Method Details

.registered_subagentsHash{Symbol => Hash}

Returns the subagent registry for this specific class (not inherited).

Returns:

  • (Hash{Symbol => Hash})


92
93
94
# File 'lib/phronomy/agent/orchestrator.rb', line 92

def self.registered_subagents
  @registered_subagents ||= {}
end

.subagent(name, agent_class, on_error: :raise) ⇒ Object

Declares a named subagent and registers it as a tool accessible to the LLM during an +invoke+ call.

Each call appends a new tool to this class's tool list. The generated tool's function name is +dispatch_to_+. When the LLM calls the tool, a fresh instance of +agent_class+ is created and +invoke+ is called with the provided input string.

Parameters:

  • name (Symbol)

    logical name that identifies the subagent

  • agent_class (Class)

    subclass of Base

  • on_error (Symbol) (defaults to: :raise)

    +:raise+ (default) re-raises any exception from the subagent; +:skip+ returns +nil+ so the LLM can decide how to proceed



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/phronomy/agent/orchestrator.rb', line 59

def self.subagent(name, agent_class, on_error: :raise)
  tool_class = Class.new(Phronomy::Tool::Base) do
    tool_name "dispatch_to_#{name}"
    description "Dispatch work to the #{name} subagent (#{agent_class.name})"
    param :input, type: :string, desc: "The task or question for the subagent"

    define_method(:execute) do |input:|
      # Inherit the calling orchestrator's thread_id and config when
      # available so that sub-agent spans and memory stay connected.
      ctx = Thread.current[:phronomy_orchestrator_context] || {}
      result = agent_class.new.invoke(
        input,
        thread_id: ctx[:thread_id],
        config: ctx[:config] || {}
      )
      result[:output]
    rescue
      raise if on_error == :raise
      nil
    end
  end

  # Append without clobbering previously registered tools or aliases.
  @tools = (@tools || []) + [tool_class]
  @tool_aliases ||= {}

  registered_subagents[name] = {agent_class: agent_class, on_error: on_error}
end

Instance Method Details

#dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) ⇒ Array<Hash, nil>

Dispatches multiple heterogeneous agent tasks in parallel using Ruby threads. Each task is a Hash describing one agent invocation.

Results are returned in the same order as the input +tasks+ array. Concurrency is bounded by +max_concurrency+; when nil all tasks run at once (original behaviour).

Error semantics are controlled by +on_error+:

  • +:raise+ (default) — every task runs to completion; the first exception in input order is then re-raised in the calling thread.
  • +:skip+ — failed tasks return +nil+; no exception is raised.

Parameters:

  • tasks (Array<Hash>)
  • max_concurrency (Integer, nil) (defaults to: nil)

    maximum number of concurrent threads; nil means no limit (all tasks run simultaneously)

  • on_error (Symbol) (defaults to: :raise)

    +:raise+ or +:skip+

  • timeout (Numeric, nil) (defaults to: nil)

    maximum seconds to wait for all workers; nil means wait indefinitely. When the deadline is exceeded, TimeoutError is raised and all surviving worker threads are killed.

  • cancellation_token (Phronomy::CancellationToken, nil) (defaults to: nil)

    when provided, the token is merged into each task's config (unless the task already sets one) so that every worker agent checks it before making LLM calls.

  • force_kill (Boolean) (defaults to: false)

    when +true+, surviving worker threads are killed with +Thread#kill+ after the grace period if they do not stop cooperatively. When +false+ (default), workers are asked to stop cooperatively but are never killed; the caller receives TimeoutError immediately and abandoned workers discard their results when they eventually finish. +false+ is safer for production because +Thread#kill+ can interrupt +ensure+ blocks.

  • task (Hash)

    a customizable set of options

Returns:

  • (Array<Hash, nil>)

    agent results in the same order as +tasks+

Raises:

  • (ArgumentError)

    if +on_error+ is not +:raise+ or +:skip+

  • (ArgumentError)

    if +max_concurrency+ is not a positive Integer or nil

  • (Phronomy::TimeoutError)

    if +timeout+ is exceeded



133
134
135
136
137
138
139
140
141
142
# File 'lib/phronomy/agent/orchestrator.rb', line 133

def dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false)
  unless [:raise, :skip].include?(on_error)
    raise ArgumentError, "unknown on_error: #{on_error.inspect}"
  end
  if max_concurrency && !(max_concurrency.is_a?(Integer) && max_concurrency.positive?)
    raise ArgumentError, "max_concurrency must be a positive Integer"
  end

  bounded_map(tasks, max_concurrency: max_concurrency, on_error: on_error, timeout: timeout, cancellation_token: cancellation_token, force_kill: force_kill)
end

#fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) ⇒ Array<Hash, nil>

Runs the same agent against multiple inputs in parallel (fan-out pattern).

Accepts the same +max_concurrency:+ and +on_error:+ keyword arguments as #dispatch_parallel and forwards them unchanged.

Parameters:

  • agent (Class)

    agent class to invoke for every input

  • inputs (Array<String>)

    list of input strings

  • config (Hash) (defaults to: {})

    forwarded to every +agent#invoke+ call

  • thread_id (String, nil) (defaults to: nil)

    forwarded to every +agent#invoke+ call

  • max_concurrency (Integer, nil) (defaults to: nil)

    forwarded to #dispatch_parallel

  • on_error (Symbol) (defaults to: :raise)

    forwarded to #dispatch_parallel

Returns:

  • (Array<Hash, nil>)

    results in the same order as +inputs+



157
158
159
160
161
162
163
164
165
166
# File 'lib/phronomy/agent/orchestrator.rb', line 157

def fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false)
  dispatch_parallel(
    *inputs.map { |input| {agent: agent, input: input, config: config, thread_id: thread_id} },
    max_concurrency: max_concurrency,
    on_error: on_error,
    timeout: timeout,
    cancellation_token: cancellation_token,
    force_kill: force_kill
  )
end

#subagent(agent_class, input, config: nil, thread_id: nil) ⇒ Hash

Programmatically dispatches a single sub-agent from inside an orchestrator instance, inheriting the parent's +thread_id+ and +config+ by default.

Parameters:

  • agent_class (Class)

    subclass of Base

  • input (String)

    task or question for the sub-agent

  • config (Hash, nil) (defaults to: nil)

    override config (falls back to parent's)

  • thread_id (String, nil) (defaults to: nil)

    override thread_id (falls back to parent's)

Returns:

  • (Hash)

    the sub-agent's result hash (+:output+, +:messages+)



177
178
179
180
181
182
183
184
# File 'lib/phronomy/agent/orchestrator.rb', line 177

def subagent(agent_class, input, config: nil, thread_id: nil)
  ctx = Thread.current[:phronomy_orchestrator_context] || {}
  agent_class.new.invoke(
    input,
    config: config || ctx[:config] || {},
    thread_id: thread_id || ctx[:thread_id]
  )
end