Class: RobotLab::RactorWorkerPool
- Inherits:
-
Object
- Object
- RobotLab::RactorWorkerPool
- Defined in:
- lib/robot_lab/ractor_worker_pool.rb
Overview
A pool of Ractor workers that execute CPU-bound, Ractor-safe tools.
Work is distributed via a shared RactorQueue. Each worker runs a blocking loop, pops RactorJob instances, dispatches to the named tool class, and pushes the frozen result (or a RactorJobError) to the job’s per-job reply_queue.
Shutdown uses a poison-pill pattern: one nil sentinel per worker is pushed to the work queue; each worker exits when it pops nil.
Only tools that declare ractor_safe true should be submitted. Tool classes are instantiated fresh inside the Ractor for each call.
Constant Summary collapse
- QUEUE_CAPACITY =
1024
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Class Method Summary collapse
-
.process_job(job) ⇒ Object
Called inside Ractor worker blocks — must be a class method.
- .wrap_error(err) ⇒ Object
Instance Method Summary collapse
-
#initialize(size: :auto) ⇒ RactorWorkerPool
constructor
A new instance of RactorWorkerPool.
-
#shutdown ⇒ void
Gracefully shut down the pool via poison-pill pattern.
-
#submit(tool_class_name, args) ⇒ Object
Submit a tool job and block until the result is available.
Constructor Details
#initialize(size: :auto) ⇒ RactorWorkerPool
Returns a new instance of RactorWorkerPool.
28 29 30 31 32 33 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 28 def initialize(size: :auto) @size = size == :auto ? Etc.nprocessors : size.to_i @closed = false @work_q = RactorQueue.new(capacity: QUEUE_CAPACITY) @workers = @size.times.map { spawn_worker(@work_q) } end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
25 26 27 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 25 def size @size end |
Class Method Details
.process_job(job) ⇒ Object
Called inside Ractor worker blocks — must be a class method.
77 78 79 80 81 82 83 84 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 77 def self.process_job(job) tool_class = Object.const_get(job.payload[:tool_class]) result = tool_class.new.execute(**job.payload[:args].transform_keys(&:to_sym)) frozen_result = ::Ractor.make_shareable(result.frozen? ? result : result.dup.freeze) job.reply_queue.push(frozen_result) rescue StandardError => e job.reply_queue.push(wrap_error(e)) end |
.wrap_error(err) ⇒ Object
86 87 88 89 90 91 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 86 def self.wrap_error(err) RobotLab::RactorJobError.new( message: err..freeze, backtrace: (err.backtrace || []).map(&:freeze).freeze ) end |
Instance Method Details
#shutdown ⇒ void
This method returns an undefined value.
Gracefully shut down the pool via poison-pill pattern.
64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 64 def shutdown return if @closed @closed = true @size.times { @work_q.push(nil) } @workers.each do |w| w.join rescue StandardError nil end end |
#submit(tool_class_name, args) ⇒ Object
Submit a tool job and block until the result is available.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/robot_lab/ractor_worker_pool.rb', line 41 def submit(tool_class_name, args) raise ToolError, 'Pool is shut down' if @closed reply_q = RactorQueue.new(capacity: 1) payload = RactorBoundary.freeze_deep({ tool_class: tool_class_name.to_s, args: args }) job = RactorJob.new( id: SecureRandom.uuid.freeze, type: :tool, payload: payload, reply_queue: reply_q ) @work_q.push(job) result = reply_q.pop raise ToolError, "Tool '#{tool_class_name}' failed in Ractor: #{result.}" if result.is_a?(RactorJobError) result end |