Class: RobotLab::RactorNetworkScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/ractor_network_scheduler.rb

Overview

Schedules frozen robot task descriptions across Ractor workers.

Robots stay in threads for LLM calls (ruby_llm is not Ractor-safe). The scheduler distributes frozen RobotSpec payloads; each worker constructs a fresh Robot, runs the task, and returns a frozen result.

Task ordering respects depends_on: tasks are only dispatched once all named dependencies have resolved (same topological semantics as SimpleFlow::Pipeline).

Examples:

scheduler = RactorNetworkScheduler.new(memory: shared_memory)
scheduler.run_pipeline([
  { spec: analyst_spec, depends_on: :none },
  { spec: writer_spec,  depends_on: ["analyst"] }
], message: "Process this")
scheduler.shutdown

Constant Summary collapse

QUEUE_CAPACITY =
256

Instance Method Summary collapse

Constructor Details

#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler

Returns a new instance of RactorNetworkScheduler.

Parameters:

  • memory (Memory)

    shared network memory for all robot tasks

  • pool_size (Integer, :auto) (defaults to: :auto)

    number of Ractor workers



27
28
29
30
31
32
33
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 27

def initialize(memory:, pool_size: :auto)
  @memory  = memory
  @work_q  = RactorQueue.new(capacity: QUEUE_CAPACITY)
  @size    = pool_size == :auto ? Etc.nprocessors : pool_size.to_i
  @workers = @size.times.map { spawn_worker(@work_q) }
  @closed  = false
end

Instance Method Details

#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>

Run a pipeline of specs in dependency order.

Parameters:

  • specs_with_deps (Array<Hash>)

    each entry has :spec and :depends_on

  • message (String)

    initial message passed to entry-point robots

Returns:

  • (Hash<String, String>)

    name => result for each completed robot



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 48

def run_pipeline(specs_with_deps, message:)
  completed = {}
  remaining = specs_with_deps.dup

  until remaining.empty?
    ready, remaining = remaining.partition do |entry|
      deps = entry[:depends_on]
      deps == :none || deps == :optional ||
        Array(deps).all? { |d| completed.key?(d) }
    end

    raise RobotLab::Error, "Circular dependency or unresolvable deps in RactorNetworkScheduler" if ready.empty?

    threads = ready.map do |entry|
      spec = entry[:spec]
      msg  = completed.values.last || message
      Thread.new { [spec.name, execute_spec(spec, msg)] }.tap { |t| t.report_on_exception = false }
    end

    threads.each do |t|
      name, result = t.value
      completed[name] = result
    end
  end

  completed
end

#run_spec(spec, message:) ⇒ String

Run a single spec and return the result string.

Parameters:

Returns:

  • (String)

    the robot’s last_text_content



39
40
41
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 39

def run_spec(spec, message:)
  execute_spec(spec, message)
end

#shutdownObject

Gracefully shut down worker Ractors.



77
78
79
80
81
82
83
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 77

def shutdown
  return if @closed

  @closed = true
  @size.times { @work_q.push(nil) }
  @workers.each { |w| w.join rescue nil }
end