Class: Phronomy::Agent::Orchestrator

Inherits:
Base
  • Object
show all
Defined in:
lib/phronomy/agent/orchestrator.rb

Overview

Base class for orchestrator agents that coordinate multiple subagents. Implements the Orchestrator-Subagent multi-agent coordination pattern (Anthropic blog, Pattern 2).

Extends Base with:

  • A +subagent+ class-level DSL for declarative subagent registration. Each declared subagent is automatically exposed as an LLM-callable tool.
  • +dispatch_parallel+ for programmatic parallel invocation of heterogeneous agents.
  • +fan_out+ for parallel invocation of the same agent across multiple inputs.

Examples:

Declarative DSL

class ResearchOrchestrator < Phronomy::Agent::Orchestrator
  model "gpt-4o"
  instructions "You coordinate research tasks."
  subagent :searcher,   SearchAgent
  subagent :summarizer, SummaryAgent
end

result = ResearchOrchestrator.new.invoke("Research the latest AI news.")

Programmatic parallel dispatch

class MyOrchestrator < Phronomy::Agent::Orchestrator
  model "gpt-4o"
  instructions "Dispatch tasks in parallel."

  def run(input)
    results = dispatch_parallel(
      { agent: SearchAgent,   input: "topic A" },
      { agent: AnalysisAgent, input: input }
    )
    results.map { |r| r[:output] }.join("\n")
  end
end

Fan-out (same agent, multiple inputs)

results = fan_out(agent: TranslationAgent, inputs: ["Hello", "World"])

See Also:

Instance Attribute Summary

Attributes included from Concerns::BeforeCompletion

#before_completion

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#_add_handoff_tool, #_handoff_tools, _on_compact_callback, _on_compaction_trigger_callback, _on_trim_callback, cache_instructions, context_overhead, #context_version_cache, context_window, instructions, #invoke, #invoke_async, invoke_timeout, max_iterations, max_output_tokens, max_parallel_tools, model, on_compact, on_compaction_trigger, on_trim, provider, #run_as_child, static_knowledge, static_knowledge_chunks, static_knowledge_refresh!, static_knowledge_sources, #stream, temperature, tool_aliases, tools

Methods included from Concerns::Suspendable

#on_approval_required, #resume, #scope_policy=

Methods included from Concerns::BeforeCompletion

included

Methods included from Concerns::Guardrailable

#add_input_guardrail, #add_output_guardrail

Methods included from Concerns::Retryable

included

Methods included from Runnable

#batch, #invoke, #stream, #trace

Class Method Details

._subagent_tool_classesArray<Class>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the subagent tool classes registered on this specific class. Used by #prepare_tool_class to inject context.

Returns:

  • (Array<Class>)


109
110
111
# File 'lib/phronomy/agent/orchestrator.rb', line 109

def self._subagent_tool_classes
  @_subagent_tool_classes || []
end

.registered_subagentsHash{Symbol => Hash}

Returns the subagent registry for this specific class (not inherited).

Returns:

  • (Hash{Symbol => Hash})


117
118
119
# File 'lib/phronomy/agent/orchestrator.rb', line 117

def self.registered_subagents
  @registered_subagents ||= {}
end

.subagent(name, agent_class, on_error: :raise) ⇒ Object

Declares a named subagent and registers it as a tool accessible to the LLM during an +invoke+ call.

Each call appends a new tool to this class's tool list. The generated tool's function name is +dispatch_to_+. When the LLM calls the tool, a fresh instance of +agent_class+ is created and +invoke+ is called with the provided input string.

Parameters:

  • name (Symbol)

    logical name that identifies the subagent

  • agent_class (Class)

    subclass of Base

  • on_error (Symbol) (defaults to: :raise)

    +:raise+ (default) re-raises any exception from the subagent; +:skip+ returns +nil+ so the LLM can decide how to proceed



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/phronomy/agent/orchestrator.rb', line 59

def self.subagent(name, agent_class, on_error: :raise)
  tool_class = Class.new(Phronomy::Tool::Base) do
    tool_name "dispatch_to_#{name}"
    description "Dispatch work to the #{name} subagent (#{agent_class.name})"
    param :input, type: :string, desc: "The task or question for the subagent"

    # @_orchestrator_context is injected at call time by prepare_tool_class.
    attr_writer :_orchestrator_context

    define_method(:execute) do |input:|
      # Inherit the calling orchestrator's thread_id, config, and
      # InvocationContext so that child subagent spans and memory stay
      # connected to the parent invocation.
      ctx = @_orchestrator_context || {}
      parent_ic = ctx[:invocation_context]
      task_config = ctx[:config] || {}

      # Propagate parent InvocationContext to the child agent so that
      # cancellation, deadline, and tracing carry through automatically.
      if parent_ic && !task_config[:invocation_context]
        child_ic = parent_ic.merge(parent_task_id: parent_ic.task_id)
        task_config = task_config.merge(invocation_context: child_ic)
      end

      result = agent_class.new.invoke_async(
        input,
        thread_id: ctx[:thread_id] || parent_ic&.thread_id,
        config: task_config
      ).await
      result[:output]
    rescue
      raise if on_error == :raise
      nil
    end
  end

  # Track this tool class so prepare_tool_class can inject context.
  @_subagent_tool_classes = (@_subagent_tool_classes || []) + [tool_class]

  # Append without clobbering previously registered tools or aliases.
  @tools = (@tools || []) + [tool_class]
  @tool_aliases ||= {}

  registered_subagents[name] = {agent_class: agent_class, on_error: on_error}
end

Instance Method Details

#dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) ⇒ Array<Hash, nil>

Dispatches multiple heterogeneous agent tasks in parallel using cooperative Tasks. Each task is a Hash describing one agent invocation.

Results are returned in the same order as the input +tasks+ array. Concurrency is bounded by +max_concurrency+; when nil all tasks run at once (original behaviour).

Error semantics are controlled by +on_error+:

  • +:raise+ (default) — every task runs to completion; the first exception in input order is then re-raised in the calling task.
  • +:skip+ — failed tasks return +nil+; no exception is raised.

Parameters:

  • tasks (Array<Hash>)
  • max_concurrency (Integer, nil) (defaults to: nil)

    maximum number of concurrent tasks; nil means no limit (all tasks run simultaneously)

  • on_error (Symbol) (defaults to: :raise)

    +:raise+ or +:skip+

  • timeout (Numeric, nil) (defaults to: nil)

    maximum seconds to wait for all tasks; nil means wait indefinitely. When the deadline is exceeded, TimeoutError is raised and all surviving tasks are cancelled cooperatively.

  • cancellation_token (Phronomy::CancellationToken, nil) (defaults to: nil)

    when provided, the token is merged into each task's config (unless the task already sets one) so that every child agent checks it before making LLM calls.

  • invocation_context (Phronomy::InvocationContext, nil) (defaults to: nil)

    when provided, the context (cancellation_token, deadline, thread_id) is propagated to each child agent as a child InvocationContext.

  • force_kill (Boolean) (defaults to: false)

    deprecated — cooperative cancellation is always used; this parameter is accepted for backwards compatibility but has no effect.

  • task (Hash)

    a customizable set of options

Returns:

  • (Array<Hash, nil>)

    agent results in the same order as +tasks+

Raises:

  • (ArgumentError)

    if +on_error+ is not +:raise+ or +:skip+

  • (ArgumentError)

    if +max_concurrency+ is not a positive Integer or nil

  • (Phronomy::TimeoutError)

    if +timeout+ is exceeded



158
159
160
161
162
163
164
165
166
167
# File 'lib/phronomy/agent/orchestrator.rb', line 158

def dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false)
  unless [:raise, :skip].include?(on_error)
    raise ArgumentError, "unknown on_error: #{on_error.inspect}"
  end
  if max_concurrency && !(max_concurrency.is_a?(Integer) && max_concurrency.positive?)
    raise ArgumentError, "max_concurrency must be a positive Integer"
  end

  bounded_map(tasks, max_concurrency: max_concurrency, on_error: on_error, timeout: timeout, cancellation_token: cancellation_token, invocation_context: invocation_context, force_kill: force_kill)
end

#fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) ⇒ Array<Hash, nil>

Runs the same agent against multiple inputs in parallel (fan-out pattern).

Accepts the same +max_concurrency:+ and +on_error:+ keyword arguments as #dispatch_parallel and forwards them unchanged.

Parameters:

  • agent (Class)

    agent class to invoke for every input

  • inputs (Array<String>)

    list of input strings

  • config (Hash) (defaults to: {})

    forwarded to every +agent#invoke+ call

  • thread_id (String, nil) (defaults to: nil)

    forwarded to every +agent#invoke+ call

  • max_concurrency (Integer, nil) (defaults to: nil)

    forwarded to #dispatch_parallel

  • on_error (Symbol) (defaults to: :raise)

    forwarded to #dispatch_parallel

  • invocation_context (Phronomy::InvocationContext, nil) (defaults to: nil)

    forwarded to #dispatch_parallel for child context propagation

Returns:

  • (Array<Hash, nil>)

    results in the same order as +inputs+



184
185
186
187
188
189
190
191
192
193
194
# File 'lib/phronomy/agent/orchestrator.rb', line 184

def fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false)
  dispatch_parallel(
    *inputs.map { |input| {agent: agent, input: input, config: config, thread_id: thread_id} },
    max_concurrency: max_concurrency,
    on_error: on_error,
    timeout: timeout,
    cancellation_token: cancellation_token,
    invocation_context: invocation_context,
    force_kill: force_kill
  )
end

#subagent(agent_class, input, config: nil, thread_id: nil) ⇒ Hash

Programmatically dispatches a single sub-agent from inside an orchestrator instance, inheriting the parent's +thread_id+ and +config+ by default.

Parameters:

  • agent_class (Class)

    subclass of Base

  • input (String)

    task or question for the sub-agent

  • config (Hash, nil) (defaults to: nil)

    override config (falls back to parent's)

  • thread_id (String, nil) (defaults to: nil)

    override thread_id (falls back to parent's)

Returns:

  • (Hash)

    the sub-agent's result hash (+:output+, +:messages+)



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/phronomy/agent/orchestrator.rb', line 205

def subagent(agent_class, input, config: nil, thread_id: nil)
  ctx = @_orchestrator_context || {}
  parent_ic = ctx[:invocation_context]
  effective_config = config || ctx[:config] || {}

  # Propagate parent InvocationContext to the child agent.
  if parent_ic && !effective_config[:invocation_context]
    child_ic = parent_ic.merge(parent_task_id: parent_ic.task_id)
    effective_config = effective_config.merge(invocation_context: child_ic)
  end

  agent_class.new.invoke_async(
    input,
    config: effective_config,
    thread_id: thread_id || ctx[:thread_id] || parent_ic&.thread_id
  ).await
end