Class: RobotLab::RactorWorkerPool
- Inherits:
-
Object
- Object
- RobotLab::RactorWorkerPool
- Defined in:
- lib/robot_lab/ractor_worker_pool.rb
Overview
A pool of Ractor workers that execute CPU-bound, Ractor-safe tools.
Work is distributed via a shared RactorQueue. Each worker runs a blocking loop, pops RactorJob instances, dispatches to the named tool class, and pushes the frozen result (or a RactorJobError) to the job’s per-job reply_queue.
Shutdown uses a poison-pill pattern: one nil sentinel per worker is pushed to the work queue; each worker exits when it pops nil.
Only tools that declare ractor_safe true should be submitted. Tool classes are instantiated fresh inside the Ractor for each call.
Constant Summary collapse
- QUEUE_CAPACITY =
1024
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#initialize(size: :auto) ⇒ RactorWorkerPool
constructor
A new instance of RactorWorkerPool.
-
#shutdown ⇒ void
Gracefully shut down the pool via poison-pill pattern.
-
#submit(tool_class_name, args) ⇒ Object
Submit a tool job and block until the result is available.
Constructor Details
#initialize(size: :auto) ⇒ RactorWorkerPool
Returns a new instance of RactorWorkerPool.
28 29 30 31 32 33 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 28 def initialize(size: :auto) @size = size == :auto ? Etc.nprocessors : size.to_i @closed = false @work_q = RactorQueue.new(capacity: QUEUE_CAPACITY) @workers = @size.times.map { spawn_worker(@work_q) } end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
25 26 27 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 25 def size @size end |
Instance Method Details
#shutdown ⇒ void
This method returns an undefined value.
Gracefully shut down the pool via poison-pill pattern.
69 70 71 72 73 74 75 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 69 def shutdown return if @closed @closed = true @size.times { @work_q.push(nil) } @workers.each { |w| w.join rescue nil } end |
#submit(tool_class_name, args) ⇒ Object
Submit a tool job and block until the result is available.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 41 def submit(tool_class_name, args) raise ToolError, "Pool is shut down" if @closed reply_q = RactorQueue.new(capacity: 1) payload = RactorBoundary.freeze_deep({ tool_class: tool_class_name.to_s, args: args }) job = RactorJob.new( id: SecureRandom.uuid.freeze, type: :tool, payload: payload, reply_queue: reply_q ) @work_q.push(job) result = reply_q.pop if result.is_a?(RactorJobError) raise ToolError, "Tool '#{tool_class_name}' failed in Ractor: #{result.}" end result end |