Class: Phronomy::Agent::Orchestrator
- Defined in:
- lib/phronomy/agent/orchestrator.rb
Overview
Base class for orchestrator agents that coordinate multiple subagents. Implements the Orchestrator-Subagent multi-agent coordination pattern (Anthropic blog, Pattern 2).
Extends Base with:
- A +subagent+ class-level DSL for declarative subagent registration. Each declared subagent is automatically exposed as an LLM-callable tool.
- +dispatch_parallel+ for programmatic parallel invocation of heterogeneous agents.
- +fan_out+ for parallel invocation of the same agent across multiple inputs.
Instance Attribute Summary
Attributes included from Concerns::BeforeCompletion
Class Method Summary collapse
-
._subagent_tool_classes ⇒ Array<Class>
private
Returns the subagent tool classes registered on this specific class.
-
.registered_subagents ⇒ Hash{Symbol => Hash}
Returns the subagent registry for this specific class (not inherited).
-
.subagent(name, agent_class, on_error: :raise) ⇒ Object
Declares a named subagent and registers it as a tool accessible to the LLM during an +invoke+ call.
Instance Method Summary collapse
-
#dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) ⇒ Array<Hash, nil>
Dispatches multiple heterogeneous agent tasks in parallel using cooperative Tasks.
-
#fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) ⇒ Array<Hash, nil>
Runs the same agent against multiple inputs in parallel (fan-out pattern).
-
#subagent(agent_class, input, config: nil, thread_id: nil) ⇒ Hash
Programmatically dispatches a single sub-agent from inside an orchestrator instance, inheriting the parent's +thread_id+ and +config+ by default.
Methods inherited from Base
#_add_handoff_tool, #_handoff_tools, _on_compact_callback, _on_compaction_trigger_callback, _on_trim_callback, cache_instructions, context_overhead, #context_version_cache, context_window, instructions, #invoke, #invoke_async, invoke_timeout, max_iterations, max_output_tokens, max_parallel_tools, model, on_compact, on_compaction_trigger, on_trim, provider, #run_as_child, static_knowledge, static_knowledge_chunks, static_knowledge_refresh!, static_knowledge_sources, #stream, temperature, tool_aliases, tools
Methods included from Concerns::Suspendable
#on_approval_required, #resume, #scope_policy=
Methods included from Concerns::BeforeCompletion
Methods included from Concerns::Guardrailable
#add_input_guardrail, #add_output_guardrail
Methods included from Concerns::Retryable
Methods included from Runnable
#batch, #invoke, #stream, #trace
Class Method Details
._subagent_tool_classes ⇒ Array<Class>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the subagent tool classes registered on this specific class. Used by #prepare_tool_class to inject context.
109 110 111 |
# File 'lib/phronomy/agent/orchestrator.rb', line 109 def self._subagent_tool_classes @_subagent_tool_classes || [] end |
.registered_subagents ⇒ Hash{Symbol => Hash}
Returns the subagent registry for this specific class (not inherited).
117 118 119 |
# File 'lib/phronomy/agent/orchestrator.rb', line 117 def self.registered_subagents @registered_subagents ||= {} end |
.subagent(name, agent_class, on_error: :raise) ⇒ Object
Declares a named subagent and registers it as a tool accessible to the LLM during an +invoke+ call.
Each call appends a new tool to this class's tool list. The generated
tool's function name is +dispatch_to_
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/phronomy/agent/orchestrator.rb', line 59 def self.subagent(name, agent_class, on_error: :raise) tool_class = Class.new(Phronomy::Tool::Base) do tool_name "dispatch_to_#{name}" description "Dispatch work to the #{name} subagent (#{agent_class.name})" param :input, type: :string, desc: "The task or question for the subagent" # @_orchestrator_context is injected at call time by prepare_tool_class. attr_writer :_orchestrator_context define_method(:execute) do |input:| # Inherit the calling orchestrator's thread_id, config, and # InvocationContext so that child subagent spans and memory stay # connected to the parent invocation. ctx = @_orchestrator_context || {} parent_ic = ctx[:invocation_context] task_config = ctx[:config] || {} # Propagate parent InvocationContext to the child agent so that # cancellation, deadline, and tracing carry through automatically. if parent_ic && !task_config[:invocation_context] child_ic = parent_ic.merge(parent_task_id: parent_ic.task_id) task_config = task_config.merge(invocation_context: child_ic) end result = agent_class.new.invoke_async( input, thread_id: ctx[:thread_id] || parent_ic&.thread_id, config: task_config ).await result[:output] rescue raise if on_error == :raise nil end end # Track this tool class so prepare_tool_class can inject context. @_subagent_tool_classes = (@_subagent_tool_classes || []) + [tool_class] # Append without clobbering previously registered tools or aliases. @tools = (@tools || []) + [tool_class] @tool_aliases ||= {} registered_subagents[name] = {agent_class: agent_class, on_error: on_error} end |
Instance Method Details
#dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) ⇒ Array<Hash, nil>
Dispatches multiple heterogeneous agent tasks in parallel using cooperative Tasks. Each task is a Hash describing one agent invocation.
Results are returned in the same order as the input +tasks+ array. Concurrency is bounded by +max_concurrency+; when nil all tasks run at once (original behaviour).
Error semantics are controlled by +on_error+:
- +:raise+ (default) — every task runs to completion; the first exception in input order is then re-raised in the calling task.
- +:skip+ — failed tasks return +nil+; no exception is raised.
158 159 160 161 162 163 164 165 166 167 |
# File 'lib/phronomy/agent/orchestrator.rb', line 158 def dispatch_parallel(*tasks, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) unless [:raise, :skip].include?(on_error) raise ArgumentError, "unknown on_error: #{on_error.inspect}" end if max_concurrency && !(max_concurrency.is_a?(Integer) && max_concurrency.positive?) raise ArgumentError, "max_concurrency must be a positive Integer" end bounded_map(tasks, max_concurrency: max_concurrency, on_error: on_error, timeout: timeout, cancellation_token: cancellation_token, invocation_context: invocation_context, force_kill: force_kill) end |
#fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) ⇒ Array<Hash, nil>
Runs the same agent against multiple inputs in parallel (fan-out pattern).
Accepts the same +max_concurrency:+ and +on_error:+ keyword arguments as #dispatch_parallel and forwards them unchanged.
184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/phronomy/agent/orchestrator.rb', line 184 def fan_out(agent:, inputs:, config: {}, thread_id: nil, max_concurrency: nil, on_error: :raise, timeout: nil, cancellation_token: nil, invocation_context: nil, force_kill: false) dispatch_parallel( *inputs.map { |input| {agent: agent, input: input, config: config, thread_id: thread_id} }, max_concurrency: max_concurrency, on_error: on_error, timeout: timeout, cancellation_token: cancellation_token, invocation_context: invocation_context, force_kill: force_kill ) end |
#subagent(agent_class, input, config: nil, thread_id: nil) ⇒ Hash
Programmatically dispatches a single sub-agent from inside an orchestrator instance, inheriting the parent's +thread_id+ and +config+ by default.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/phronomy/agent/orchestrator.rb', line 205 def subagent(agent_class, input, config: nil, thread_id: nil) ctx = @_orchestrator_context || {} parent_ic = ctx[:invocation_context] effective_config = config || ctx[:config] || {} # Propagate parent InvocationContext to the child agent. if parent_ic && !effective_config[:invocation_context] child_ic = parent_ic.merge(parent_task_id: parent_ic.task_id) effective_config = effective_config.merge(invocation_context: child_ic) end agent_class.new.invoke_async( input, config: effective_config, thread_id: thread_id || ctx[:thread_id] || parent_ic&.thread_id ).await end |