Class: RactorPool
- Inherits:
-
Object
- Object
- RactorPool
- Defined in:
- lib/ractor-pool.rb,
lib/ractor-pool/version.rb
Overview
A thread-safe, lock-free pool of Ractor workers with coordinator or round-robin dispatch for distributing work.
RactorPool manages a fixed number of worker ractors that process work items in parallel. The :coordinator strategy (the default) routes each work item to whichever worker is currently idle via a dedicated coordinator Ractor, so a slow item on one worker does not block faster items from being picked up by other workers. Use it when work items have variable cost. The :round_robin strategy dispatches work to workers in turn, so a slow item on one worker queues the next item destined for that worker behind it, even if other workers are idle. Use it when work items have uniform cost. Results are collected and passed to a result handler running in a separate thread.
Defined Under Namespace
Classes: EnqueuedWorkAfterShutdownError, Error
Constant Summary collapse
- VERSION =
"0.4.0"
Instance Method Summary collapse
-
#<<(work) ⇒ void
Queues a work item to be processed by an available worker.
-
#initialize(size: Etc.nprocessors, worker:, strategy: :coordinator, name: nil, on_error: nil) {|result| ... } ⇒ void
constructor
Creates a new RactorPool with the specified number of workers.
-
#shutdown ⇒ void
Shuts down the pool gracefully.
Constructor Details
#initialize(size: Etc.nprocessors, worker:, strategy: :coordinator, name: nil, on_error: nil) {|result| ... } ⇒ void
Creates a new RactorPool with the specified number of workers.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/ractor-pool.rb', line 107 def initialize(size: Etc.nprocessors, worker:, strategy: :coordinator, name: nil, on_error: nil, &result_handler) raise ArgumentError, "size must be a positive Integer" unless size.is_a?(Integer) && size > 0 raise ArgumentError, "worker must be a Proc" unless worker.is_a?(Proc) raise ArgumentError, "strategy must be one of #{STRATEGIES.inspect}" unless STRATEGIES.include?(strategy) raise ArgumentError, "on_error must be a Proc" if on_error && !on_error.is_a?(Proc) @size = size @worker = Ractor.shareable_proc(&worker) @strategy = strategy @name = name @on_error = Ractor.shareable_proc(&on_error) if on_error @result_handler = result_handler @in_flight = Atom.new(0) @shutdown = Atom.new(false) @next_worker_index = Atom.new(-1) if size > 1 && strategy == :round_robin @result_port = Ractor::Port.new if result_handler @error_port = Ractor::Port.new unless on_error @coordinator = start_coordinator if size > 1 && strategy == :coordinator @workers = start_workers @collector = start_collector @error_collector = start_error_collector end |
Instance Method Details
#<<(work) ⇒ void
This method returns an undefined value.
Queues a work item to be processed by an available worker.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/ractor-pool.rb', line 143 def <<(work) raise EnqueuedWorkAfterShutdownError if @shutdown.value @in_flight.swap { |count| count + 1 } if @shutdown.value @in_flight.swap { |count| count - 1 } raise EnqueuedWorkAfterShutdownError end target = if @coordinator @coordinator elsif @next_worker_index @workers[@next_worker_index.swap { |index| (index + 1) % @size }] else @workers.first end begin target.send(work, move: true) ensure @in_flight.swap { |count| count - 1 } end end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down the pool gracefully.
This method:
-
Prevents new work from being queued
-
Waits for all in-flight work submissions to complete
-
Allows all queued work to complete
-
Waits for all workers to finish
-
Waits for all results and errors to be processed
This method is idempotent and can be called multiple times safely.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/ractor-pool.rb', line 185 def shutdown already_shutdown = false @shutdown.swap do |current| if current already_shutdown = true current else true end end return if already_shutdown Thread.pass until @in_flight.value.zero? if @coordinator @coordinator.send(SHUTDOWN, move: true) @workers.each(&:join) @coordinator.join else @workers.each { |worker| worker.send(SHUTDOWN, move: true) } @workers.each(&:join) @result_port&.send(SHUTDOWN, move: true) end @error_port&.send(SHUTDOWN, move: true) @error_collector&.join @collector&.join end |