Class: CDC::Parallel::ProcessorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/processor_pool.rb

Overview

Note:

‘ProcessorPool` preserves the order of returned results, not the order in which independent items execute. If a sink needs strict ordering by transaction, relation, or primary key, use the ecosystem ordering contract and an ordered dispatcher/runtime above this primitive.

Executes one Ractor-safe ‘cdc-core` processor across a fixed set of pre-warmed Ractor workers.

‘ProcessorPool` is the low-level execution primitive used by Runtime. It accepts normalized `cdc-core` work items, sends them across Ractor boundaries, invokes the configured processor, and returns `CDC::Core::ProcessorResult` objects in input order.

This class is intentionally focused on **CPU-bound parallel execution**. Use it when the processor spends most of its time doing Ruby work such as transformation, enrichment, serialization, compression, scoring, or other in-memory computation. For I/O-heavy work, the CDC Ecosystem boundary is a future fiber-friendly runtime such as ‘cdc-concurrent`.

## Processor safety contract

The supplied processor must declare ‘ractor_safe!` on its class. That declaration is treated as the processor author’s explicit promise that the processor object and its dependencies can safely cross a Ractor boundary.

‘ProcessorPool` validates this declaration before booting workers:

Declaring ‘ractor_safe!` does not make unsafe code safe. It only allows the processor to be passed into worker Ractors. Mutable global state, database connections, sockets, caches, file handles, and non-shareable objects still need to be designed carefully by the processor implementor.

## Execution model

Workers are created during initialization and reused for all dispatches. This pays Ractor startup cost once and keeps the pool stable even when individual processor calls fail.

The pool uses a fan-out / fan-in pattern:

“‘text work items

|
v

ProcessorPool

|
+----> Worker Ractor 1
+----> Worker Ractor 2
+----> Worker Ractor N
              |
              v
       ProcessorResult
              |
              v
        ordered results

“‘

Fan-out uses round-robin worker selection. Fan-in collects responses from a reply port and reorders them by submission index, so ‘process_many` always returns results in the same order as the input array even when work completes out of order.

Examples:

Declaring a processor as Ractor-safe

class AnalyticsProcessor < CDC::Core::Processor
  ractor_safe!

  def process(event)
    CDC::Core::ProcessorResult.success(event)
  end
end

pool = CDC::Parallel::ProcessorPool.new(
  processor: AnalyticsProcessor.new,
  size: 4
)

Processing one item

result = pool.process(event)
result.success? #=> true

Processing a batch while preserving result order

results = pool.process_many([event_a, event_b, event_c])
results.map(&:success?)

Shutting down explicitly

pool.shutdown

See Also:

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void

Create a new pool and boot its worker Ractors.

Parameters:

  • processor (CDC::Core::Processor)

    Processor instance used by every worker. Its class must respond to ‘ractor_safe?` and return `true`.

  • size (Integer) (defaults to: Etc.nprocessors)

    Number of worker Ractors to boot. Defaults to ‘Etc.nprocessors`.

  • timeout (Numeric, nil) (defaults to: nil)

    Optional timeout, in seconds, used when waiting for worker results and during shutdown. ‘nil` means wait indefinitely.

Raises:

  • (UnsafeProcessorError)

    Raised when the processor class has not declared ‘ractor_safe!`.

  • (ArgumentError)

    Raised by Configuration when ‘size` or `timeout` is invalid.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/cdc/parallel/processor_pool.rb', line 112

def initialize(processor:, size: Etc.nprocessors, timeout: nil)
  validate_processor!(processor)

  @processor = ::Ractor.make_shareable(processor)
  @configuration = Configuration.new(size:, timeout:)
  booted_workers = Array.new(@configuration.size) do
    build_worker(@processor)
  end

  @workers = booted_workers.map(&:first).freeze
  @worker_inboxes = booted_workers.map(&:last).freeze

  @next_worker = 0
  @dispatch_mutex = Mutex.new
  @shutdown = false
end

Instance Method Details

#process(item) ⇒ CDC::Core::ProcessorResult

Process one work item synchronously.

This is a convenience wrapper around #process_many. The work still executes inside a worker Ractor; the call blocks until the corresponding ‘CDC::Core::ProcessorResult` is available or until the optional timeout is reached.

Parameters:

  • item (Object)

    Shareable work item, usually a ‘CDC::Core::ChangeEvent`.

Returns:

  • (CDC::Core::ProcessorResult)

    Normalized processor result. Processor exceptions are captured as failure results rather than escaping directly from the worker Ractor.

Raises:



143
144
145
# File 'lib/cdc/parallel/processor_pool.rb', line 143

def process(item)
  process_many([item]).fetch(0)
end

#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>

Process many work items using the pre-warmed worker pool.

Each item is made shareable before dispatch. Items are assigned to worker inboxes using round-robin selection. Responses are collected through a per-call reply port and returned in the same order as the input array.

Parameters:

  • items (Array<Object>)

    Work items to process. Empty arrays are valid and return an empty frozen array.

Returns:

  • (Array<CDC::Core::ProcessorResult>)

    Frozen array of normalized results, ordered to match ‘items`.

Raises:



160
161
162
163
164
165
166
167
168
169
# File 'lib/cdc/parallel/processor_pool.rb', line 160

def process_many(items)
  work_items = items.map { |item| ::Ractor.make_shareable(item) }
  reply_port = ::Ractor::Port.new

  dispatch(work_items, reply_port)

  collect_results(reply_port, work_items.length)
ensure
  reply_port&.close
end

#shutdownvoid

This method returns an undefined value.

Shut down the pool and wait for worker Ractors to exit.

Shutdown is idempotent. The first caller signals all worker inboxes with a stop message and waits for workers to join. Later calls return without doing anything.



178
179
180
181
182
183
184
185
186
187
# File 'lib/cdc/parallel/processor_pool.rb', line 178

def shutdown
  @dispatch_mutex.synchronize do
    return if @shutdown

    @shutdown = true
    signal_workers
  end

  wait_for_workers
end