Class: CDC::Parallel::ProcessorPool
- Inherits:
-
Object
- Object
- CDC::Parallel::ProcessorPool
- Defined in:
- lib/cdc/parallel/processor_pool.rb
Overview
Executes one Ractor-safe processor in isolated Ractor workers.
This v0.1 implementation intentionally uses one-shot worker Ractors for deterministic synchronous semantics while preserving the public pool API. The parallel-pool dependency is kept as the runtime foundation for later async/throughput-focused versions.
Instance Method Summary collapse
- #initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void constructor
-
#process(event) ⇒ CDC::Core::ProcessorResult
Process one ChangeEvent.
-
#shutdown ⇒ void
Shut down the pool.
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
16 17 18 19 20 21 22 |
# File 'lib/cdc/parallel/processor_pool.rb', line 16 def initialize(processor:, size: Etc.nprocessors, timeout: nil) validate_processor!(processor) @processor = ::Ractor.make_shareable(processor) @configuration = Configuration.new(size:, timeout:) @shutdown = false end |
Instance Method Details
#process(event) ⇒ CDC::Core::ProcessorResult
Process one ChangeEvent.
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/cdc/parallel/processor_pool.rb', line 28 def process(event) raise ShutdownError, "processor pool has been shut down" if @shutdown work = ::Ractor.make_shareable(event) worker = ::Ractor.new(@processor, work) do |processor, item| CDC::Parallel::ResultCollector.normalize(processor.process(item)) rescue StandardError => e CDC::Parallel::ResultCollector.worker_failure(e) end ResultCollector.normalize(take(worker)) end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down the pool.
44 45 46 |
# File 'lib/cdc/parallel/processor_pool.rb', line 44 def shutdown @shutdown = true end |