Class: CDC::Parallel::ProcessorPool
- Inherits:
-
Object
- Object
- CDC::Parallel::ProcessorPool
- Defined in:
- lib/cdc/parallel/processor_pool.rb,
sig/cdc/parallel/processor_pool.rbs
Overview
Executes one Ractor-safe processor in pre-warmed persistent Ractor workers.
Defined Under Namespace
Classes: WorkerSlot
Instance Method Summary collapse
- #collect_results(reply_port, count, assignments = []) ⇒ Array[result?]
- #collect_results_with_timeout(reply_port, results, assignments = []) ⇒ Array[result?]
- #collect_results_without_timeout(reply_port, results, assignments = []) ⇒ Array[result?]
-
#degraded? ⇒ Boolean
Whether any worker slot is in crash-loop cooldown.
- #dispatch(work_items, reply_port) ⇒ Array[WorkerSlot?]
- #initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true) ⇒ void constructor
- #next_worker_slot ⇒ WorkerSlot
-
#process(item) ⇒ CDC::Core::ProcessorResult
Process one work item synchronously.
-
#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>
Process many work items using the pre-warmed worker pool.
-
#respawns ⇒ Integer
Total worker-slot respawns since pool boot.
-
#shutdown ⇒ void
Shut down the pool and call processor lifecycle hooks if manage_lifecycle is true.
- #signal_workers ⇒ void
- #timeout_results(results) ⇒ Array[result]
- #validate_processor!(processor) ⇒ void
- #wait_for_workers ⇒ void
- #wait_for_workers_with_timeout ⇒ void
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true) ⇒ void
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/cdc/parallel/processor_pool.rb', line 128 def initialize( processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true ) validate_processor!(processor) processor.start if manage_lifecycle @processor = ::Ractor.make_shareable(processor) @manage_lifecycle = manage_lifecycle @configuration = Configuration.new(size:, timeout:) @slots = Array.new(@configuration.size) do |index| WorkerSlot.new( index:, processor: @processor, supervision:, max_respawns:, respawn_window:, respawn_cooldown: ) end.freeze @workers = @slots.map(&:worker).freeze @worker_inboxes = @slots.map(&:inbox).freeze @next_worker = 0 @dispatch_mutex = Mutex.new @shutdown = false end |
Instance Method Details
#collect_results(reply_port, count, assignments = []) ⇒ Array[result?]
338 339 340 341 342 343 344 345 346 347 |
# File 'lib/cdc/parallel/processor_pool.rb', line 338 def collect_results(reply_port, count, assignments = []) results = Array.new(count) return results.freeze if count.zero? if @configuration.timeout collect_results_with_timeout(reply_port, results, assignments) else collect_results_without_timeout(reply_port, results, assignments) end end |
#collect_results_with_timeout(reply_port, results, assignments = []) ⇒ Array[result?]
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/cdc/parallel/processor_pool.rb', line 359 def collect_results_with_timeout(reply_port, results, assignments = []) timeout = @configuration.timeout return results.freeze unless timeout deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout results.length.times do remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) return timeout_results(results) unless remaining.positive? index, response = ::Timeout.timeout(remaining, TimeoutError) { reply_port.receive } results[index] = ResultCollector.normalize(response) assignments[index]&.complete(index) rescue TimeoutError return timeout_results(results) end results.freeze end |
#collect_results_without_timeout(reply_port, results, assignments = []) ⇒ Array[result?]
349 350 351 352 353 354 355 356 357 |
# File 'lib/cdc/parallel/processor_pool.rb', line 349 def collect_results_without_timeout(reply_port, results, assignments = []) results.length.times do index, response = reply_port.receive results[index] = ResultCollector.normalize(response) assignments[index]&.complete(index) end results.freeze end |
#degraded? ⇒ Boolean
Whether any worker slot is in crash-loop cooldown.
173 174 175 |
# File 'lib/cdc/parallel/processor_pool.rb', line 173 def degraded? @slots.any?(&:degraded?) end |
#dispatch(work_items, reply_port) ⇒ Array[WorkerSlot?]
243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/cdc/parallel/processor_pool.rb', line 243 def dispatch(work_items, reply_port) @dispatch_mutex.synchronize do raise ShutdownError, "processor pool has been shut down" if @shutdown assignments = Array.new(work_items.length) work_items.each_with_index do |item, index| slot = next_worker_slot assignments[index] = slot slot.send_work(index, item, reply_port) end assignments end end |
#next_worker_slot ⇒ WorkerSlot
329 330 331 332 333 334 335 336 |
# File 'lib/cdc/parallel/processor_pool.rb', line 329 def next_worker_slot slot = @slots[@next_worker] @next_worker += 1 @next_worker = 0 if @next_worker >= @slots.length slot end |
#process(item) ⇒ CDC::Core::ProcessorResult
Process one work item synchronously.
191 192 193 |
# File 'lib/cdc/parallel/processor_pool.rb', line 191 def process(item) process_many([item]).fetch(0) end |
#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>
Process many work items using the pre-warmed worker pool.
208 209 210 211 212 213 214 215 216 217 |
# File 'lib/cdc/parallel/processor_pool.rb', line 208 def process_many(items) work_items = items.map { |item| ::Ractor.make_shareable(item) } reply_port = ::Ractor::Port.new assignments = dispatch(work_items, reply_port) collect_results(reply_port, work_items.length, assignments).compact.freeze ensure reply_port&.close end |
#respawns ⇒ Integer
Total worker-slot respawns since pool boot.
166 167 168 |
# File 'lib/cdc/parallel/processor_pool.rb', line 166 def respawns @slots.sum(&:respawns) end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down the pool and call processor lifecycle hooks if manage_lifecycle is true.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/cdc/parallel/processor_pool.rb', line 226 def shutdown @dispatch_mutex.synchronize do return if @shutdown @shutdown = true signal_workers end wait_for_workers return unless @manage_lifecycle @processor.flush @processor.stop end |
#signal_workers ⇒ void
This method returns an undefined value.
257 258 259 |
# File 'lib/cdc/parallel/processor_pool.rb', line 257 def signal_workers @slots.each(&:shutdown) end |
#timeout_results(results) ⇒ Array[result]
379 380 381 382 383 384 385 386 387 388 |
# File 'lib/cdc/parallel/processor_pool.rb', line 379 def timeout_results(results) missing = results.count(&:nil?) timeout_error = TimeoutError.new( "processor pool timed out after #{@configuration.timeout} seconds waiting for #{missing} result(s)" ) results.map do |result| result || CDC::Core::ProcessorResult.failure(timeout_error) end.freeze end |
#validate_processor!(processor) ⇒ void
This method returns an undefined value.
285 286 287 288 289 290 291 |
# File 'lib/cdc/parallel/processor_pool.rb', line 285 def validate_processor!(processor) return if processor.class.respond_to?(:ractor_safe?) && processor.class.ractor_safe? raise UnsafeProcessorError, "#{processor.class} must declare ractor_safe!" end |
#wait_for_workers ⇒ void
This method returns an undefined value.
261 262 263 264 265 266 267 |
# File 'lib/cdc/parallel/processor_pool.rb', line 261 def wait_for_workers if @configuration.timeout wait_for_workers_with_timeout else @slots.each(&:join) end end |
#wait_for_workers_with_timeout ⇒ void
This method returns an undefined value.
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/cdc/parallel/processor_pool.rb', line 269 def wait_for_workers_with_timeout timeout = @configuration.timeout return unless timeout deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout @slots.each do |slot| remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) break unless remaining.positive? ::Timeout.timeout(remaining, TimeoutError) { slot.join } rescue TimeoutError break end end |