Class: RobotLab::RactorNetworkScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/robot_lab/ractor_network_scheduler.rb

Overview

Schedules frozen robot task descriptions across Ractor workers.

Robots stay in threads for LLM calls (ruby_llm is not Ractor-safe). The scheduler distributes frozen RobotSpec payloads; each worker constructs a fresh Robot, runs the task, and returns a frozen result.

Task ordering respects depends_on: tasks are only dispatched once all named dependencies have resolved (same topological semantics as SimpleFlow::Pipeline).

Examples:

scheduler = RactorNetworkScheduler.new(memory: shared_memory)
scheduler.run_pipeline([
  { spec: analyst_spec, depends_on: :none },
  { spec: writer_spec,  depends_on: ["analyst"] }
], message: "Process this")
scheduler.shutdown

Constant Summary collapse

QUEUE_CAPACITY =
256

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(memory:, pool_size: :auto) ⇒ RactorNetworkScheduler

Returns a new instance of RactorNetworkScheduler.

Parameters:

  • memory (Memory)

    shared network memory for all robot tasks

  • pool_size (Integer, :auto) (defaults to: :auto)

    number of Ractor workers



27
28
29
30
31
32
33
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 27

def initialize(memory:, pool_size: :auto)
  @memory  = memory
  @work_q  = RactorQueue.new(capacity: QUEUE_CAPACITY)
  @size    = pool_size == :auto ? Etc.nprocessors : pool_size.to_i
  @workers = @size.times.map { spawn_worker(@work_q) }
  @closed  = false
end

Class Method Details

.build_and_run_robot(spec, message) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 87

def self.build_and_run_robot(spec, message)
  config = spec.config_hash.empty? ? nil : RobotLab::RunConfig.new(**spec.config_hash.transform_keys(&:to_sym))
  robot  = RobotLab::Robot.new(
    name: spec.name,
    template: spec.template&.to_sym,
    system_prompt: spec.system_prompt,
    config: config
  )
  robot.run(message).last_text_content.to_s.freeze
end

.process_job(job) ⇒ Object

Called inside Ractor worker blocks — must be a class method.



79
80
81
82
83
84
85
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 79

def self.process_job(job)
  spec    = job.payload[:spec]
  message = job.payload[:message]
  job.reply_queue.push(build_and_run_robot(spec, message))
rescue StandardError => e
  job.reply_queue.push(wrap_error(e))
end

.wrap_error(err) ⇒ Object



98
99
100
101
102
103
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 98

def self.wrap_error(err)
  RobotLab::RactorJobError.new(
    message: err.message.freeze,
    backtrace: (err.backtrace || []).map(&:freeze).freeze
  )
end

Instance Method Details

#run_pipeline(specs_with_deps, message:) ⇒ Hash<String, String>

Run a pipeline of specs in dependency order.

Parameters:

  • specs_with_deps (Array<Hash>)

    each entry has :spec and :depends_on

  • message (String)

    initial message passed to entry-point robots

Returns:

  • (Hash<String, String>)

    name => result for each completed robot



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 48

def run_pipeline(specs_with_deps, message:)
  completed = {}
  remaining = specs_with_deps.dup

  until remaining.empty?
    ready, remaining = partition_ready(remaining, completed)
    raise RobotLab::Error, 'Circular dependency or unresolvable deps in RactorNetworkScheduler' if ready.empty?

    dispatch_ready(ready, completed, message).each do |t|
      name, result = t.value
      completed[name] = result
    end
  end

  completed
end

#run_spec(spec, message:) ⇒ String

Run a single spec and return the result string.

Parameters:

Returns:

  • (String)

    the robot’s last_text_content



39
40
41
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 39

def run_spec(spec, message:)
  execute_spec(spec, message)
end

#shutdownObject

Gracefully shut down worker Ractors.



66
67
68
69
70
71
72
73
74
75
76
# File 'lib/robot_lab/ractor_network_scheduler.rb', line 66

def shutdown
  return if @closed

  @closed = true
  @size.times { @work_q.push(nil) }
  @workers.each do |w|
    w.join
  rescue StandardError
    nil
  end
end