Class: CDC::Parallel::ProcessorPool
- Inherits:
-
Object
- Object
- CDC::Parallel::ProcessorPool
- Defined in:
- lib/cdc/parallel/processor_pool.rb
Overview
‘ProcessorPool` preserves the order of returned results, not the order in which independent items execute. If a sink needs strict ordering by transaction, relation, or primary key, use the ecosystem ordering contract and an ordered dispatcher/runtime above this primitive.
Executes one Ractor-safe ‘cdc-core` processor across a fixed set of pre-warmed Ractor workers.
‘ProcessorPool` is the low-level execution primitive used by Runtime. It accepts normalized `cdc-core` work items, sends them across Ractor boundaries, invokes the configured processor, and returns `CDC::Core::ProcessorResult` objects in input order.
This class is intentionally focused on **CPU-bound parallel execution**. Use it when the processor spends most of its time doing Ruby work such as transformation, enrichment, serialization, compression, scoring, or other in-memory computation. For I/O-heavy work, the CDC Ecosystem boundary is a future fiber-friendly runtime such as ‘cdc-concurrent`.
## Processor safety contract
The supplied processor must declare ‘ractor_safe!` on its class. That declaration is treated as the processor author’s explicit promise that the processor object and its dependencies can safely cross a Ractor boundary.
‘ProcessorPool` validates this declaration before booting workers:
Declaring ‘ractor_safe!` does not make unsafe code safe. It only allows the processor to be passed into worker Ractors. Mutable global state, database connections, sockets, caches, file handles, and non-shareable objects still need to be designed carefully by the processor implementor.
## Execution model
Workers are created during initialization and reused for all dispatches. This pays Ractor startup cost once and keeps the pool stable even when individual processor calls fail.
The pool uses a fan-out / fan-in pattern:
“‘text work items
|
v
ProcessorPool
|
+----> Worker Ractor 1
+----> Worker Ractor 2
+----> Worker Ractor N
|
v
ProcessorResult
|
v
ordered results
“‘
Fan-out uses round-robin worker selection. Fan-in collects responses from a reply port and reorders them by submission index, so ‘process_many` always returns results in the same order as the input array even when work completes out of order.
Instance Method Summary collapse
-
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
constructor
Create a new pool and boot its worker Ractors.
-
#process(item) ⇒ CDC::Core::ProcessorResult
Process one work item synchronously.
-
#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>
Process many work items using the pre-warmed worker pool.
-
#shutdown ⇒ void
Shut down the pool and wait for worker Ractors to exit.
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
Create a new pool and boot its worker Ractors.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/cdc/parallel/processor_pool.rb', line 112 def initialize(processor:, size: Etc.nprocessors, timeout: nil) validate_processor!(processor) @processor = ::Ractor.make_shareable(processor) @configuration = Configuration.new(size:, timeout:) booted_workers = Array.new(@configuration.size) do build_worker(@processor) end @workers = booted_workers.map(&:first).freeze @worker_inboxes = booted_workers.map(&:last).freeze @next_worker = 0 @dispatch_mutex = Mutex.new @shutdown = false end |
Instance Method Details
#process(item) ⇒ CDC::Core::ProcessorResult
Process one work item synchronously.
This is a convenience wrapper around #process_many. The work still executes inside a worker Ractor; the call blocks until the corresponding ‘CDC::Core::ProcessorResult` is available or until the optional timeout is reached.
143 144 145 |
# File 'lib/cdc/parallel/processor_pool.rb', line 143 def process(item) process_many([item]).fetch(0) end |
#process_many(items) ⇒ Array<CDC::Core::ProcessorResult>
Process many work items using the pre-warmed worker pool.
Each item is made shareable before dispatch. Items are assigned to worker inboxes using round-robin selection. Responses are collected through a per-call reply port and returned in the same order as the input array.
160 161 162 163 164 165 166 167 168 169 |
# File 'lib/cdc/parallel/processor_pool.rb', line 160 def process_many(items) work_items = items.map { |item| ::Ractor.make_shareable(item) } reply_port = ::Ractor::Port.new dispatch(work_items, reply_port) collect_results(reply_port, work_items.length) ensure reply_port&.close end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down the pool and wait for worker Ractors to exit.
Shutdown is idempotent. The first caller signals all worker inboxes with a stop message and waits for workers to join. Later calls return without doing anything.
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/cdc/parallel/processor_pool.rb', line 178 def shutdown @dispatch_mutex.synchronize do return if @shutdown @shutdown = true signal_workers end wait_for_workers end |