Class: CDC::Parallel::ProcessorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/processor_pool.rb

Overview

Executes one Ractor-safe processor in pre-warmed persistent Ractor workers.

Workers are created during initialization and reused for every dispatch. This pays Ractor startup cost once, keeps workers alive after processor failures, and provides both synchronous single-item processing and batched dispatch for throughput-oriented benchmarks and runtimes.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void

Parameters:

  • processor (CDC::Core::Processor)
  • size (Integer) (defaults to: Etc.nprocessors)
  • timeout (Float, nil) (defaults to: nil)


16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/cdc/parallel/processor_pool.rb', line 16

def initialize(processor:, size: Etc.nprocessors, timeout: nil)
  validate_processor!(processor)

  @processor = ::Ractor.make_shareable(processor)
  @configuration = Configuration.new(size:, timeout:)
  @workers = Array.new(@configuration.size) do
    build_worker(@processor)
  end.freeze

  @next_worker = 0
  @shutdown = false
end

Instance Method Details

#process(item) ⇒ CDC::Core::ProcessorResult

Process one work item synchronously.

Parameters:

  • item (Object)

Returns:

  • (CDC::Core::ProcessorResult)


33
34
35
# File 'lib/cdc/parallel/processor_pool.rb', line 33

def process(item)
  process_many([item]).fetch(0)
end

#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>

Process many work items using the pre-warmed worker pool.

Results are returned in the same order as the supplied work items.

Parameters:

  • items (Array<Object>)

Returns:

  • (Array<CDC::Core::ProcessorResult>)


43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/cdc/parallel/processor_pool.rb', line 43

def process_many(items)
  raise ShutdownError, "processor pool has been shut down" if @shutdown

  work_items = items.map { |item| ::Ractor.make_shareable(item) }
  reply_port = ::Ractor::Port.new

  work_items.each_with_index do |item, index|
    next_worker.send([index, item, reply_port])
  end

  collect_results(reply_port, work_items.length)
ensure
  reply_port&.close
end

#shutdownvoid

This method returns an undefined value.

Shut down the pool.



61
62
63
64
65
66
67
68
# File 'lib/cdc/parallel/processor_pool.rb', line 61

def shutdown
  return if @shutdown

  @shutdown = true

  signal_workers
  wait_for_workers
end