Class: RobotLab::RactorNetworkScheduler
- Inherits:
-
Object
- Object
- RobotLab::RactorNetworkScheduler
- Defined in:
- lib/robot_lab/ractor_network_scheduler.rb
Overview
Schedules frozen robot task descriptions across Ractor workers.
Robots stay in threads for LLM calls (ruby_llm is not Ractor-safe). The scheduler distributes frozen RobotSpec payloads; each worker constructs a fresh Robot, runs the task, and returns a frozen result.
Task ordering respects depends_on: tasks are only dispatched once all named dependencies have resolved (same topological semantics as SimpleFlow::Pipeline).
Constant Summary collapse
- QUEUE_CAPACITY =
256
Instance Method Summary collapse
-
#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler
constructor
A new instance of RactorNetworkScheduler.
-
#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>
Run a pipeline of specs in dependency order.
-
#run_spec(spec, message:) ⇒ String
Run a single spec and return the result string.
-
#shutdown ⇒ Object
Gracefully shut down worker Ractors.
Constructor Details
#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler
Returns a new instance of RactorNetworkScheduler.
27 28 29 30 31 32 33 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 27 def initialize(memory:, pool_size: :auto) @memory = memory @work_q = RactorQueue.new(capacity: QUEUE_CAPACITY) @size = pool_size == :auto ? Etc.nprocessors : pool_size.to_i @workers = @size.times.map { spawn_worker(@work_q) } @closed = false end |
Instance Method Details
#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>
Run a pipeline of specs in dependency order.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 48 def run_pipeline(specs_with_deps, message:) completed = {} remaining = specs_with_deps.dup until remaining.empty? ready, remaining = remaining.partition do |entry| deps = entry[:depends_on] deps == :none || deps == :optional || Array(deps).all? { |d| completed.key?(d) } end raise RobotLab::Error, "Circular dependency or unresolvable deps in RactorNetworkScheduler" if ready.empty? threads = ready.map do |entry| spec = entry[:spec] msg = completed.values.last || Thread.new { [spec.name, execute_spec(spec, msg)] }.tap { |t| t.report_on_exception = false } end threads.each do |t| name, result = t.value completed[name] = result end end completed end |
#run_spec(spec, message:) ⇒ String
Run a single spec and return the result string.
39 40 41 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 39 def run_spec(spec, message:) execute_spec(spec, ) end |
#shutdown ⇒ Object
Gracefully shut down worker Ractors.
77 78 79 80 81 82 83 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 77 def shutdown return if @closed @closed = true @size.times { @work_q.push(nil) } @workers.each { |w| w.join rescue nil } end |