Class: RobotLab::RactorNetworkScheduler
- Inherits:
-
Object
- Object
- RobotLab::RactorNetworkScheduler
- Defined in:
- lib/robot_lab/ractor_network_scheduler.rb
Overview
Schedules frozen robot task descriptions across Ractor workers.
Robots stay in threads for LLM calls (ruby_llm is not Ractor-safe). The scheduler distributes frozen RobotSpec payloads; each worker constructs a fresh Robot, runs the task, and returns a frozen result.
Task ordering respects depends_on: tasks are only dispatched once all named dependencies have resolved (same topological semantics as SimpleFlow::Pipeline).
Constant Summary collapse
- QUEUE_CAPACITY =
Capacity for the work queue.
256
Instance Method Summary collapse
-
#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler
constructor
A new instance of RactorNetworkScheduler.
-
#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>
Run a pipeline of specs in dependency order.
-
#run_spec(spec, message:) ⇒ String
Run a single spec and return the result string.
-
#shutdown ⇒ void
Gracefully shut down worker Ractors.
Constructor Details
#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler
Returns a new instance of RactorNetworkScheduler.
31 32 33 34 35 36 37 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 31 def initialize(memory:, pool_size: :auto) @memory = memory @work_q = RactorQueue.new(capacity: QUEUE_CAPACITY) @size = pool_size == :auto ? Etc.nprocessors : pool_size.to_i @workers = @size.times.map { spawn_worker(@work_q) } @closed = false end |
Instance Method Details
#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>
Run a pipeline of specs in dependency order.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 54 def run_pipeline(specs_with_deps, message:) completed = {} # name => result string remaining = specs_with_deps.dup until remaining.empty? ready, remaining = remaining.partition do |entry| deps = entry[:depends_on] deps == :none || deps == :optional || Array(deps).all? { |d| completed.key?(d) } end raise RobotLab::Error, "Circular dependency or unresolvable deps in RactorNetworkScheduler" if ready.empty? # Submit all ready tasks concurrently via threads. # report_on_exception is disabled because exceptions are propagated # to the caller via t.value — the default reporting is redundant noise. threads = ready.map do |entry| spec = entry[:spec] msg = completed.values.last || Thread.new { [spec.name, execute_spec(spec, msg)] }.tap { |t| t.report_on_exception = false } end threads.each do |t| name, result = t.value completed[name] = result end end completed end |
#run_spec(spec, message:) ⇒ String
Run a single spec and return the result string.
44 45 46 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 44 def run_spec(spec, message:) execute_spec(spec, ) end |
#shutdown ⇒ void
This method returns an undefined value.
Gracefully shut down worker Ractors.
87 88 89 90 91 92 93 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 87 def shutdown return if @closed @closed = true @size.times { @work_q.push(nil) } @workers.each { |w| w.join rescue nil } end |