Class: CDC::Concurrent::Router

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/concurrent/router.rb

Overview

Routes CDC work items to the appropriate concurrent execution pool.

Router is intentionally small. It keeps Runtime focused on lifecycle while preserving the distinction between individual events, transaction envelopes, and event batches.

Instance Method Summary collapse

Constructor Details

#initialize(processor_pool:, transaction_pool:) ⇒ void

Builds a router using existing processor and transaction pools.

Parameters:

  • processor_pool (ProcessorPool)

    Pool used for individual ChangeEvent work and event batches.

  • transaction_pool (TransactionPool)

    Pool used for TransactionEnvelope work.



16
17
18
19
# File 'lib/cdc/concurrent/router.rb', line 16

def initialize(processor_pool:, transaction_pool:)
  @processor_pool = processor_pool
  @transaction_pool = transaction_pool
end

Instance Method Details

#process(item) ⇒ CDC::Core::ProcessorResult+

Dispatches a supported work item to its matching pool.

Parameters:

  • item (CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope and Array<CDC::Core::ChangeEvent>)

    Work item to process.

Returns:

  • (CDC::Core::ProcessorResult, Array<CDC::Core::ProcessorResult>)

    Processing result for the supplied item.

Raises:



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/cdc/concurrent/router.rb', line 27

def process(item)
  case item
  when CDC::Core::ChangeEvent
    @processor_pool.process(item)
  when CDC::Core::TransactionEnvelope
    @transaction_pool.process(item)
  when Array
    @processor_pool.process_many(item)
  else
    raise UnsupportedWorkItemError, "unsupported CDC work item: #{item.class}"
  end
end