Class: Ragents::Orchestrator

Inherits:
Object
  • Object
show all
Defined in:
lib/ragents/orchestrator.rb

Overview

Orchestrator for coordinating multiple agents. Supports parallel execution, sequential workflows, and agent routing.

Examples:

Basic usage

orchestrator = Ragents::Orchestrator.new(provider: my_provider)
orchestrator.register(:researcher, ResearchAgent)
orchestrator.register(:writer, WriterAgent)

result = orchestrator.run(:researcher, input: "Research topic X")

Parallel execution

results = orchestrator.parallel(
  [:researcher, { input: "Topic A" }],
  [:researcher, { input: "Topic B" }]
)

Sequential workflow

result = orchestrator.workflow do |w|
  w.step(:researcher, input: "Research X")
  w.step(:writer) { |ctx| { input: "Write about: #{ctx.last_response}" } }
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(provider: nil, **options) ⇒ Orchestrator

Returns a new instance of Orchestrator.



29
30
31
32
33
34
# File 'lib/ragents/orchestrator.rb', line 29

def initialize(provider: nil, **options)
  @provider = provider
  @agents = {}
  @contexts = {}
  @options = options
end

Instance Attribute Details

#agentsObject (readonly)

Returns the value of attribute agents.



27
28
29
# File 'lib/ragents/orchestrator.rb', line 27

def agents
  @agents
end

#contextsObject (readonly)

Returns the value of attribute contexts.



27
28
29
# File 'lib/ragents/orchestrator.rb', line 27

def contexts
  @contexts
end

#providerObject (readonly)

Returns the value of attribute provider.



27
28
29
# File 'lib/ragents/orchestrator.rb', line 27

def provider
  @provider
end

Instance Method Details

#fetch_context(name) ⇒ Object

Retrieve a stored context



115
116
117
# File 'lib/ragents/orchestrator.rb', line 115

def fetch_context(name)
  @contexts[name.to_sym]
end

#parallel(*tasks) ⇒ Array<Context>

Run multiple agents in parallel using Ractors

Parameters:

  • tasks (Array<Array>)

    Array of [name, options] tuples

Returns:

  • (Array<Context>)

    Results from all agents



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ragents/orchestrator.rb', line 58

def parallel(*tasks)
  ractors = tasks.map do |(name, opts)|
    opts ||= {}
    agent_config = @agents[name.to_sym]
    raise ArgumentError, "Unknown agent: #{name}" unless agent_config

    agent_class = agent_config[:class]
    merged_opts = agent_config[:options].merge(opts)
    context = merged_opts.delete(:context)
    input = merged_opts.delete(:input)

    agent_class.run_async(
      provider: @provider,
      context: context,
      input: input,
      **merged_opts
    )
  end

  # Collect results from all Ractors (Ruby 4.x API)
  ractors.map(&:value)
end

#register(name, agent_class, **options) ⇒ Object

Register an agent class with a name

Parameters:

  • name (Symbol)

    The agent identifier

  • agent_class (Class)

    The Agent subclass

  • options (Hash)

    Default options for this agent



40
41
42
43
# File 'lib/ragents/orchestrator.rb', line 40

def register(name, agent_class, **options)
  @agents[name.to_sym] = { class: agent_class, options: options }
  self
end

#run(name, input: nil, context: nil, **options) ⇒ Context

Run a single agent

Parameters:

  • name (Symbol)

    The agent name

  • input (String) (defaults to: nil)

    Input for the agent

  • context (Context) (defaults to: nil)

    Optional initial context

Returns:

  • (Context)

    The resulting context



50
51
52
53
# File 'lib/ragents/orchestrator.rb', line 50

def run(name, input: nil, context: nil, **options)
  agent = build_agent(name, context: context, **options)
  agent.run(input: input)
end

#store_context(name, context) ⇒ Object

Store a context for later retrieval



110
111
112
# File 'lib/ragents/orchestrator.rb', line 110

def store_context(name, context)
  @contexts[name.to_sym] = context
end

#supervised(name, max_restarts: 3, restart_delay: 1, **options) ⇒ Object

Run agents with supervisor pattern

Parameters:

  • name (Symbol)

    The agent name

  • max_restarts (Integer) (defaults to: 3)

    Maximum restart attempts

  • restart_delay (Numeric) (defaults to: 1)

    Delay between restarts



94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/ragents/orchestrator.rb', line 94

def supervised(name, max_restarts: 3, restart_delay: 1, **options)
  restarts = 0

  begin
    run(name, **options)
  rescue StandardError => e
    restarts += 1
    if restarts <= max_restarts
      sleep(restart_delay)
      retry
    end
    raise SupervisorError, "Agent #{name} failed after #{max_restarts} restarts: #{e.message}"
  end
end

#workflow {|Workflow| ... } ⇒ Context

Execute a sequential workflow

Yields:

Returns:



84
85
86
87
88
# File 'lib/ragents/orchestrator.rb', line 84

def workflow(&block)
  wf = Workflow.new(self)
  block.call(wf)
  wf.execute
end