Class: Phronomy::Agent::Orchestrator
- Defined in:
- lib/phronomy/agent/orchestrator.rb
Overview
Base class for orchestrator agents that coordinate multiple subagents. Implements the Orchestrator-Subagent multi-agent coordination pattern (Anthropic blog, Pattern 2).
Extends Base with:
- A +subagent+ class-level DSL for declarative subagent registration. Each declared subagent is automatically exposed as an LLM-callable tool.
- +dispatch_parallel+ for programmatic parallel invocation of heterogeneous agents.
- +fan_out+ for parallel invocation of the same agent across multiple inputs.
Instance Attribute Summary
Attributes included from Concerns::BeforeCompletion
Class Method Summary collapse
-
.registered_subagents ⇒ Hash{Symbol => Hash}
Returns the subagent registry for this specific class (not inherited).
-
.subagent(name, agent_class, on_error: :raise) ⇒ Object
Declares a named subagent and registers it as a tool accessible to the LLM during an +invoke+ call.
Instance Method Summary collapse
-
#dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) ⇒ Array<Hash, nil>
Dispatches multiple heterogeneous agent tasks in parallel using Ruby threads.
-
#fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) ⇒ Array<Hash, nil>
Runs the same agent against multiple inputs in parallel (fan-out pattern).
-
#subagent(agent_class, input, config: nil, thread_id: nil) ⇒ Hash
Programmatically dispatches a single sub-agent from inside an orchestrator instance, inheriting the parent's +thread_id+ and +config+ by default.
Methods inherited from Base
#_add_handoff_tool, #_handoff_tools, _on_compact_callback, _on_compaction_trigger_callback, _on_trim_callback, cache_instructions, context_overhead, #context_version_cache, context_window, instructions, #invoke, invoke_timeout, max_iterations, max_output_tokens, max_parallel_tools, model, on_compact, on_compaction_trigger, on_trim, provider, #run_as_child, static_knowledge, static_knowledge_chunks, static_knowledge_refresh!, static_knowledge_sources, #stream, temperature, tool_aliases, tools
Methods included from Concerns::Suspendable
#on_approval_required, #resume
Methods included from Concerns::BeforeCompletion
Methods included from Concerns::Guardrailable
#add_input_guardrail, #add_output_guardrail
Methods included from Concerns::Retryable
Methods included from Runnable
#batch, #invoke, #stream, #trace
Class Method Details
.registered_subagents ⇒ Hash{Symbol => Hash}
Returns the subagent registry for this specific class (not inherited).
92 93 94 |
# File 'lib/phronomy/agent/orchestrator.rb', line 92 def self.registered_subagents @registered_subagents ||= {} end |
.subagent(name, agent_class, on_error: :raise) ⇒ Object
Declares a named subagent and registers it as a tool accessible to the LLM during an +invoke+ call.
Each call appends a new tool to this class's tool list. The generated
tool's function name is +dispatch_to_
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/phronomy/agent/orchestrator.rb', line 59 def self.subagent(name, agent_class, on_error: :raise) tool_class = Class.new(Phronomy::Tool::Base) do tool_name "dispatch_to_#{name}" description "Dispatch work to the #{name} subagent (#{agent_class.name})" param :input, type: :string, desc: "The task or question for the subagent" define_method(:execute) do |input:| # Inherit the calling orchestrator's thread_id and config when # available so that sub-agent spans and memory stay connected. ctx = Thread.current[:phronomy_orchestrator_context] || {} result = agent_class.new.invoke( input, thread_id: ctx[:thread_id], config: ctx[:config] || {} ) result[:output] rescue raise if on_error == :raise nil end end # Append without clobbering previously registered tools or aliases. @tools = (@tools || []) + [tool_class] @tool_aliases ||= {} registered_subagents[name] = {agent_class: agent_class, on_error: on_error} end |
Instance Method Details
#dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) ⇒ Array<Hash, nil>
Dispatches multiple heterogeneous agent tasks in parallel using Ruby threads. Each task is a Hash describing one agent invocation.
Results are returned in the same order as the input +tasks+ array. Concurrency is bounded by +max_concurrency+; when nil all tasks run at once (original behaviour).
Error semantics are controlled by +on_error+:
- +:raise+ (default) — every task runs to completion; the first exception in input order is then re-raised in the calling thread.
- +:skip+ — failed tasks return +nil+; no exception is raised.
133 134 135 136 137 138 139 140 141 142 |
# File 'lib/phronomy/agent/orchestrator.rb', line 133 def dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) unless [:raise, :skip].include?(on_error) raise ArgumentError, "unknown on_error: #{on_error.inspect}" end if max_concurrency && !(max_concurrency.is_a?(Integer) && max_concurrency.positive?) raise ArgumentError, "max_concurrency must be a positive Integer" end bounded_map(tasks, max_concurrency: max_concurrency, on_error: on_error, timeout: timeout, cancellation_token: cancellation_token, force_kill: force_kill) end |
#fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) ⇒ Array<Hash, nil>
Runs the same agent against multiple inputs in parallel (fan-out pattern).
Accepts the same +max_concurrency:+ and +on_error:+ keyword arguments as #dispatch_parallel and forwards them unchanged.
157 158 159 160 161 162 163 164 165 166 |
# File 'lib/phronomy/agent/orchestrator.rb', line 157 def fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, force_kill: false) dispatch_parallel( *inputs.map { |input| {agent: agent, input: input, config: config, thread_id: thread_id} }, max_concurrency: max_concurrency, on_error: on_error, timeout: timeout, cancellation_token: cancellation_token, force_kill: force_kill ) end |
#subagent(agent_class, input, config: nil, thread_id: nil) ⇒ Hash
Programmatically dispatches a single sub-agent from inside an orchestrator instance, inheriting the parent's +thread_id+ and +config+ by default.
177 178 179 180 181 182 183 184 |
# File 'lib/phronomy/agent/orchestrator.rb', line 177 def subagent(agent_class, input, config: nil, thread_id: nil) ctx = Thread.current[:phronomy_orchestrator_context] || {} agent_class.new.invoke( input, config: config || ctx[:config] || {}, thread_id: thread_id || ctx[:thread_id] ) end |