Class: RactorPool
- Inherits:
-
Object
- Object
- RactorPool
- Defined in:
- lib/ractor-pool.rb,
lib/ractor-pool/version.rb
Overview
A thread-safe, lock-free pool of Ractor workers with a coordinator pattern for distributing work.
RactorPool manages a fixed number of worker ractors that process work items in parallel. Work is distributed on-demand to idle workers, ensuring efficient utilisation. Results are collected and passed to a result handler running in a separate thread.
Defined Under Namespace
Classes: EnqueuedWorkAfterShutdownError, Error
Constant Summary collapse
- VERSION =
"0.3.0"
Instance Method Summary collapse
-
#<<(work) ⇒ void
Queues a work item to be processed by an available worker.
-
#initialize(size: Etc.nprocessors, worker:, name: nil, on_error: nil) {|result| ... } ⇒ void
constructor
Creates a new RactorPool with the specified number of workers.
-
#shutdown ⇒ void
Shuts down the pool gracefully.
Constructor Details
#initialize(size: Etc.nprocessors, worker:, name: nil, on_error: nil) {|result| ... } ⇒ void
Creates a new RactorPool with the specified number of workers.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/ractor-pool.rb', line 92 def initialize(size: Etc.nprocessors, worker:, name: nil, on_error: nil, &result_handler) raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0 raise ArgumentError, "worker must be a Proc" unless worker.is_a?(Proc) raise ArgumentError, "on_error must be a Proc" if on_error && !on_error.is_a?(Proc) @size = size @worker = Ractor.shareable_proc(&worker) @name = name @on_error = Ractor.shareable_proc(&on_error) if on_error @result_handler = result_handler @in_flight = Atom.new(0) @shutdown = Atom.new(false) @result_port = Ractor::Port.new if result_handler @error_port = Ractor::Port.new unless on_error @coordinator = start_coordinator if size > 1 @workers = start_workers @error_collector = start_error_collector @collector = start_collector end |
Instance Method Details
#<<(work) ⇒ void
This method returns an undefined value.
Queues a work item to be processed by an available worker.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/ractor-pool.rb', line 125 def <<(work) raise EnqueuedWorkAfterShutdownError if @shutdown.value @in_flight.swap { |count| count + 1 } if @shutdown.value @in_flight.swap { |count| count - 1 } raise EnqueuedWorkAfterShutdownError end begin (@coordinator || @workers.first).send(work, move: true) ensure @in_flight.swap { |count| count - 1 } end end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down the pool gracefully.
This method:
-
Prevents new work from being queued
-
Waits for all in-flight work submissions to complete
-
Allows all queued work to complete
-
Waits for all workers to finish
-
Waits for all results and errors to be processed
This method is idempotent and can be called multiple times safely.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/ractor-pool.rb', line 159 def shutdown already_shutdown = false @shutdown.swap do |current| if current already_shutdown = true current else true end end return if already_shutdown Thread.pass until @in_flight.value.zero? @coordinator&.send(SHUTDOWN, move: true) || (@workers.first.send(SHUTDOWN, move: true) && @result_port&.send(SHUTDOWN, move: true)) @workers.each(&:join) @coordinator&.join @error_port&.send(SHUTDOWN, move: true) @error_collector&.join @collector&.join end |