Class: CDC::Parallel::ProcessorPool
- Inherits:
-
Object
- Object
- CDC::Parallel::ProcessorPool
- Defined in:
- lib/cdc/parallel/processor_pool.rb
Overview
Executes one Ractor-safe processor in pre-warmed persistent Ractor workers.
Workers are created during initialization and reused for every dispatch. This pays Ractor startup cost once, keeps workers alive after processor failures, and provides both synchronous single-item processing and batched dispatch for throughput-oriented benchmarks and runtimes.
Instance Method Summary collapse
- #initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void constructor
-
#process(item) ⇒ CDC::Core::ProcessorResult
Process one work item synchronously.
-
#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>
Process many work items using the pre-warmed worker pool.
-
#shutdown ⇒ void
Shut down the pool.
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/cdc/parallel/processor_pool.rb', line 16 def initialize(processor:, size: Etc.nprocessors, timeout: nil) validate_processor!(processor) @processor = ::Ractor.make_shareable(processor) @configuration = Configuration.new(size:, timeout:) @workers = Array.new(@configuration.size) do build_worker(@processor) end.freeze @next_worker = 0 @shutdown = false end |
Instance Method Details
#process(item) ⇒ CDC::Core::ProcessorResult
Process one work item synchronously.
33 34 35 |
# File 'lib/cdc/parallel/processor_pool.rb', line 33 def process(item) process_many([item]).fetch(0) end |
#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>
Process many work items using the pre-warmed worker pool.
Results are returned in the same order as the supplied work items.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/cdc/parallel/processor_pool.rb', line 43 def process_many(items) raise ShutdownError, "processor pool has been shut down" if @shutdown work_items = items.map { |item| ::Ractor.make_shareable(item) } reply_port = ::Ractor::Port.new work_items.each_with_index do |item, index| next_worker.send([index, item, reply_port]) end collect_results(reply_port, work_items.length) ensure reply_port&.close end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down the pool.
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/cdc/parallel/processor_pool.rb', line 61 def shutdown return if @shutdown @shutdown = true @workers.each do |worker| worker.send(nil) rescue Ractor::ClosedError # Already stopped. end end |