Class: CDC::Concurrent::ProcessorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/concurrent/processor_pool.rb

Overview

Executes one concurrent-safe processor using Async tasks.

ARCHITECTURAL NOTE

cdc-concurrent implements the same fan-out / fan-in execution pattern used by cdc-parallel. The runtime differs, but the processor contract and result contract remain the same.

events
   |
   v
fan-out
   |
   +----> Async task
   +----> Async task
   +----> Async task
   |
   v
fan-in
   |
   v
ProcessorResult array

Fan-out:

  • Events are dispatched into Async tasks.

  • Async::Semaphore bounds the number of concurrently running tasks.

  • Multiple events may make progress concurrently under Ruby’s scheduler.

Fan-in:

  • Tasks append indexed ProcessorResult values into a shared collection.

  • Results may complete out of execution order.

  • When preserve_order is enabled, ProcessorPool sorts by submission index so the returned array matches the input order.

Relationship to cdc-parallel:

  • cdc-concurrent performs fan-out using Async tasks and cooperative concurrency.

  • cdc-parallel performs fan-out using pre-warmed Ractor workers and true parallel execution.

  • Both runtimes preserve the same processor contract and return CDC::Core::ProcessorResult objects.

Processor authors should be able to switch runtimes without changing processor behavior when their processor satisfies the selected runtime’s safety declaration.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ void

Builds an Async-backed processor pool.

Parameters:

  • processor (CDC::Core::Processor)

    Processor instance that declares concurrent_safe!.

  • concurrency (Integer) (defaults to: 100)

    Maximum number of Async tasks allowed to run at once.

  • timeout (Float, nil) (defaults to: nil)

    Optional per-event processing timeout in seconds.

  • preserve_order (Boolean) (defaults to: true)

    Whether process_many should return results in input order.

Raises:



62
63
64
65
66
67
68
# File 'lib/cdc/concurrent/processor_pool.rb', line 62

def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true)
  validate_processor!(processor)

  @processor = processor
  @configuration = Configuration.new(concurrency:, timeout:, preserve_order:)
  @shutdown = false
end

Instance Method Details

#process(event) ⇒ CDC::Core::ProcessorResult

Processes one event synchronously through the Async runtime.

Parameters:

  • event (CDC::Core::ChangeEvent)

    Event to process.

Returns:

  • (CDC::Core::ProcessorResult)

    Normalized processor result.

Raises:



75
76
77
78
79
# File 'lib/cdc/concurrent/processor_pool.rb', line 75

def process(event)
  raise ShutdownError, "processor pool has been shut down" if @shutdown

  process_one(event)
end

#process_many(events) ⇒ Array<CDC::Core::ProcessorResult>

Processes many events through bounded Async fan-out.

When preserve_order is true, the returned array matches the order of the supplied events even if individual tasks complete out of order.

Parameters:

  • events (Array<CDC::Core::ChangeEvent>)

    Events to process.

Returns:

  • (Array<CDC::Core::ProcessorResult>)

    Frozen array of normalized results.

Raises:



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/cdc/concurrent/processor_pool.rb', line 89

def process_many(events)
  raise ShutdownError, "processor pool has been shut down" if @shutdown
  return empty_results if events.empty?

  # @type var indexed_results: Array[[Integer, CDC::Core::ProcessorResult]]
  indexed_results = []

  process_batch(events, indexed_results)

  indexed_results.sort_by!(&:first) if @configuration.preserve_order
  indexed_results.map(&:last).freeze
end

#shutdownvoid

This method returns an undefined value.

Prevents new work from being submitted to the pool.



105
106
107
# File 'lib/cdc/concurrent/processor_pool.rb', line 105

def shutdown
  @shutdown = true
end