Class: RobotLab::RactorNetworkScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/ractor_network_scheduler.rb

Overview

Schedules frozen robot task descriptions across Ractor workers.

Robots stay in threads for LLM calls (ruby_llm is not Ractor-safe). The scheduler distributes frozen RobotSpec payloads; each worker constructs a fresh Robot, runs the task, and returns a frozen result.

Task ordering respects depends_on: tasks are only dispatched once all named dependencies have resolved (same topological semantics as SimpleFlow::Pipeline).

Examples:

scheduler = RactorNetworkScheduler.new(memory: shared_memory)
scheduler.run_pipeline([
  { spec: analyst_spec, depends_on: :none },
  { spec: writer_spec,  depends_on: ["analyst"] }
], message: "Process this")
scheduler.shutdown

Constant Summary collapse

QUEUE_CAPACITY =

Capacity for the work queue.

256

Instance Method Summary collapse

Constructor Details

#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler

Returns a new instance of RactorNetworkScheduler.

Parameters:

  • memory (Memory)

    shared network memory for all robot tasks

  • pool_size (Integer, :auto) (defaults to: :auto)

    number of Ractor workers



31
32
33
34
35
36
37
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 31

def initialize(memory:, pool_size: :auto)
  @memory   = memory
  @work_q   = RactorQueue.new(capacity: QUEUE_CAPACITY)
  @size     = pool_size == :auto ? Etc.nprocessors : pool_size.to_i
  @workers  = @size.times.map { spawn_worker(@work_q) }
  @closed   = false
end

Instance Method Details

#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>

Run a pipeline of specs in dependency order.

Parameters:

  • specs_with_deps (Array<Hash>)

    each entry has :spec and :depends_on :depends_on is :none, :optional, or an Array<String> of spec names

  • message (String)

    initial message passed to entry-point robots

Returns:

  • (Hash<String, String>)

    name => result for each completed robot



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 54

def run_pipeline(specs_with_deps, message:)
  completed = {}  # name => result string
  remaining = specs_with_deps.dup

  until remaining.empty?
    ready, remaining = remaining.partition do |entry|
      deps = entry[:depends_on]
      deps == :none || deps == :optional ||
        Array(deps).all? { |d| completed.key?(d) }
    end

    raise RobotLab::Error, "Circular dependency or unresolvable deps in RactorNetworkScheduler" if ready.empty?

    # Submit all ready tasks concurrently via threads.
    # report_on_exception is disabled because exceptions are propagated
    # to the caller via t.value — the default reporting is redundant noise.
    threads = ready.map do |entry|
      spec = entry[:spec]
      msg  = completed.values.last || message
      Thread.new { [spec.name, execute_spec(spec, msg)] }.tap { |t| t.report_on_exception = false }
    end

    threads.each do |t|
      name, result = t.value
      completed[name] = result
    end
  end

  completed
end

#run_spec(spec, message:) ⇒ String

Run a single spec and return the result string.

Parameters:

Returns:

  • (String)

    the robot’s last_text_content



44
45
46
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 44

def run_spec(spec, message:)
  execute_spec(spec, message)
end

#shutdownvoid

This method returns an undefined value.

Gracefully shut down worker Ractors.



87
88
89
90
91
92
93
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 87

def shutdown
  return if @closed

  @closed = true
  @size.times { @work_q.push(nil) }
  @workers.each { |w| w.join rescue nil }
end