Class: CDC::Parallel::ProcessorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/processor_pool.rb

Overview

Executes one Ractor-safe processor in isolated Ractor workers.

This v0.1 implementation intentionally uses one-shot worker Ractors for deterministic synchronous semantics while preserving the public pool API. The parallel-pool dependency is kept as the runtime foundation for later async/throughput-focused versions.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void

Parameters:

  • processor (CDC::Core::Processor)
  • size (Integer) (defaults to: Etc.nprocessors)
  • timeout (Float, nil) (defaults to: nil)


16
17
18
19
20
21
22
# File 'lib/cdc/parallel/processor_pool.rb', line 16

def initialize(processor:, size: Etc.nprocessors, timeout: nil)
  validate_processor!(processor)

  @processor = ::Ractor.make_shareable(processor)
  @configuration = Configuration.new(size:, timeout:)
  @shutdown = false
end

Instance Method Details

#process(event) ⇒ CDC::Core::ProcessorResult

Process one ChangeEvent.

Parameters:

  • event (CDC::Core::ChangeEvent)

Returns:

  • (CDC::Core::ProcessorResult)

Raises:



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/cdc/parallel/processor_pool.rb', line 28

def process(event)
  raise ShutdownError, "processor pool has been shut down" if @shutdown

  work = ::Ractor.make_shareable(event)
  worker = ::Ractor.new(@processor, work) do |processor, item|
    CDC::Parallel::ResultCollector.normalize(processor.process(item))
  rescue StandardError => e
    CDC::Parallel::ResultCollector.worker_failure(e)
  end

  ResultCollector.normalize(take(worker))
end

#shutdownvoid

This method returns an undefined value.

Shut down the pool.



44
45
46
# File 'lib/cdc/parallel/processor_pool.rb', line 44

def shutdown
  @shutdown = true
end