Class: RobotLab::RactorWorkerPool
- Inherits:
-
Object
- Object
- RobotLab::RactorWorkerPool
- Defined in:
- lib/robot_lab/ractor_worker_pool.rb
Overview
A pool of Ractor workers that execute CPU-bound, Ractor-safe tools.
Work is distributed via a shared RactorQueue. Each worker runs a blocking loop, pops RactorJob instances, dispatches to the named tool class, and pushes the frozen result (or a RactorJobError) to the job’s per-job reply_queue.
Shutdown uses a poison-pill pattern: one nil sentinel per worker is pushed to the work queue; each worker exits when it pops nil.
Only tools that declare ractor_safe true should be submitted. Tool classes are instantiated fresh inside the Ractor for each call.
Constant Summary collapse
- QUEUE_CAPACITY =
Capacity of the shared work queue.
1024
Instance Attribute Summary collapse
-
#size ⇒ Integer
readonly
Number of worker Ractors.
Instance Method Summary collapse
-
#initialize(size: :auto) ⇒ RactorWorkerPool
constructor
Creates a new pool and starts worker Ractors immediately.
-
#shutdown ⇒ void
Gracefully shut down the pool.
-
#submit(tool_class_name, args) ⇒ Object
Submit a tool job and block until the result is available.
Constructor Details
#initialize(size: :auto) ⇒ RactorWorkerPool
Creates a new pool and starts worker Ractors immediately.
35 36 37 38 39 40 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 35 def initialize(size: :auto) @size = size == :auto ? Etc.nprocessors : size.to_i @closed = false @work_q = RactorQueue.new(capacity: QUEUE_CAPACITY) @workers = @size.times.map { spawn_worker(@work_q) } end |
Instance Attribute Details
#size ⇒ Integer (readonly)
Returns number of worker Ractors.
30 31 32 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 30 def size @size end |
Instance Method Details
#shutdown ⇒ void
This method returns an undefined value.
Gracefully shut down the pool.
Pushes one nil poison pill per worker so each exits its loop. Waits for all workers to terminate.
81 82 83 84 85 86 87 88 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 81 def shutdown return if @closed @closed = true # Push one nil poison pill per worker @size.times { @work_q.push(nil) } @workers.each { |w| w.join rescue nil } end |
#submit(tool_class_name, args) ⇒ Object
Submit a tool job and block until the result is available.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 49 def submit(tool_class_name, args) raise ToolError, "Pool is shut down" if @closed reply_q = RactorQueue.new(capacity: 1) payload = RactorBoundary.freeze_deep({ tool_class: tool_class_name.to_s, args: args }) job = RactorJob.new( id: SecureRandom.uuid.freeze, type: :tool, payload: payload, reply_queue: reply_q ) @work_q.push(job) result = reply_q.pop if result.is_a?(RactorJobError) raise ToolError, "Tool '#{tool_class_name}' failed in Ractor: #{result.}" end result end |