Class: RobotLab::RactorNetworkScheduler
- Inherits:
-
Object
- Object
- RobotLab::RactorNetworkScheduler
- Defined in:
- lib/robot_lab/ractor_network_scheduler.rb
Overview
Schedules frozen robot task descriptions across Ractor workers.
Robots stay in threads for LLM calls (ruby_llm is not Ractor-safe). The scheduler distributes frozen RobotSpec payloads; each worker constructs a fresh Robot, runs the task, and returns a frozen result.
Task ordering respects depends_on: tasks are only dispatched once all named dependencies have resolved (same topological semantics as SimpleFlow::Pipeline).
Constant Summary collapse
- QUEUE_CAPACITY =
256
Class Method Summary collapse
- .build_and_run_robot(spec, message) ⇒ Object
-
.process_job(job) ⇒ Object
Called inside Ractor worker blocks — must be a class method.
- .wrap_error(err) ⇒ Object
Instance Method Summary collapse
-
#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler
constructor
A new instance of RactorNetworkScheduler.
-
#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>
Run a pipeline of specs in dependency order.
-
#run_spec(spec, message:) ⇒ String
Run a single spec and return the result string.
-
#shutdown ⇒ Object
Gracefully shut down worker Ractors.
Constructor Details
#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler
Returns a new instance of RactorNetworkScheduler.
27 28 29 30 31 32 33 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 27 def initialize(memory:, pool_size: :auto) @memory = memory @work_q = RactorQueue.new(capacity: QUEUE_CAPACITY) @size = pool_size == :auto ? Etc.nprocessors : pool_size.to_i @workers = @size.times.map { spawn_worker(@work_q) } @closed = false end |
Class Method Details
.build_and_run_robot(spec, message) ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 87 def self.build_and_run_robot(spec, ) config = spec.config_hash.empty? ? nil : RobotLab::RunConfig.new(**spec.config_hash.transform_keys(&:to_sym)) robot = RobotLab::Robot.new( name: spec.name, template: spec.template&.to_sym, system_prompt: spec.system_prompt, config: config ) robot.run().last_text_content.to_s.freeze end |
.process_job(job) ⇒ Object
Called inside Ractor worker blocks — must be a class method.
79 80 81 82 83 84 85 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 79 def self.process_job(job) spec = job.payload[:spec] = job.payload[:message] job.reply_queue.push(build_and_run_robot(spec, )) rescue StandardError => e job.reply_queue.push(wrap_error(e)) end |
.wrap_error(err) ⇒ Object
98 99 100 101 102 103 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 98 def self.wrap_error(err) RobotLab::RactorJobError.new( message: err..freeze, backtrace: (err.backtrace || []).map(&:freeze).freeze ) end |
Instance Method Details
#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>
Run a pipeline of specs in dependency order.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 48 def run_pipeline(specs_with_deps, message:) completed = {} remaining = specs_with_deps.dup until remaining.empty? ready, remaining = partition_ready(remaining, completed) raise RobotLab::Error, 'Circular dependency or unresolvable deps in RactorNetworkScheduler' if ready.empty? dispatch_ready(ready, completed, ).each do |t| name, result = t.value completed[name] = result end end completed end |
#run_spec(spec, message:) ⇒ String
Run a single spec and return the result string.
39 40 41 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 39 def run_spec(spec, message:) execute_spec(spec, ) end |
#shutdown ⇒ Object
Gracefully shut down worker Ractors.
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 66 def shutdown return if @closed @closed = true @size.times { @work_q.push(nil) } @workers.each do |w| w.join rescue StandardError nil end end |