Class: CDC::Concurrent::ProcessorPool
- Inherits:
-
Object
- Object
- CDC::Concurrent::ProcessorPool
- Defined in:
- lib/cdc/concurrent/processor_pool.rb
Overview
Executes one concurrent-safe processor using Async tasks.
ARCHITECTURAL NOTE
cdc-concurrent implements the same fan-out / fan-in execution pattern used by cdc-parallel. The runtime differs, but the processor contract and result contract remain the same.
events
|
v
fan-out
|
+----> Async task
+----> Async task
+----> Async task
|
v
fan-in
|
v
ProcessorResult array
Fan-out:
-
Events are dispatched into Async tasks.
-
Async::Semaphore bounds the number of concurrently running tasks.
-
Multiple events may make progress concurrently under Ruby’s scheduler.
Fan-in:
-
Tasks append indexed ProcessorResult values into a shared collection.
-
Results may complete out of execution order.
-
When preserve_order is enabled, ProcessorPool sorts by submission index so the returned array matches the input order.
Relationship to cdc-parallel:
-
cdc-concurrent performs fan-out using Async tasks and cooperative concurrency.
-
cdc-parallel performs fan-out using pre-warmed Ractor workers and true parallel execution.
-
Both runtimes preserve the same processor contract and return CDC::Core::ProcessorResult objects.
Processor authors should be able to switch runtimes without changing processor behavior when their processor satisfies the selected runtime’s safety declaration.
Instance Method Summary collapse
-
#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ void
constructor
Builds an Async-backed processor pool.
-
#process(event) ⇒ CDC::Core::ProcessorResult
Processes one event synchronously through the Async runtime.
-
#process_many(events) ⇒ Array<CDC::Core::ProcessorResult>
Processes many events through bounded Async fan-out.
-
#shutdown ⇒ void
Prevents new work from being submitted to the pool.
Constructor Details
#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ void
Builds an Async-backed processor pool.
62 63 64 65 66 67 68 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 62 def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) validate_processor!(processor) @processor = processor @configuration = Configuration.new(concurrency:, timeout:, preserve_order:) @shutdown = false end |
Instance Method Details
#process(event) ⇒ CDC::Core::ProcessorResult
Processes one event synchronously through the Async runtime.
75 76 77 78 79 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 75 def process(event) raise ShutdownError, "processor pool has been shut down" if @shutdown process_one(event) end |
#process_many(events) ⇒ Array<CDC::Core::ProcessorResult>
Processes many events through bounded Async fan-out.
When preserve_order is true, the returned array matches the order of the supplied events even if individual tasks complete out of order.
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 89 def process_many(events) raise ShutdownError, "processor pool has been shut down" if @shutdown return empty_results if events.empty? # @type var indexed_results: Array[[Integer, CDC::Core::ProcessorResult]] indexed_results = [] process_batch(events, indexed_results) indexed_results.sort_by!(&:first) if @configuration.preserve_order indexed_results.map(&:last).freeze end |
#shutdown ⇒ void
This method returns an undefined value.
Prevents new work from being submitted to the pool.
105 106 107 |
# File 'lib/cdc/concurrent/processor_pool.rb', line 105 def shutdown @shutdown = true end |