Class: CDC::Parallel::Router

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/router.rb

Overview

Routes normalized ‘cdc-core` work items to the matching parallel runtime primitive.

‘Router` is deliberately small. It does not inspect source-specific payloads, apply filters, decode database values, or decide scheduling policy. Its responsibility is only to look at the already-normalized `cdc-core` object shape and forward it to the pool that knows how to process that shape.

Examples:

Routing a single event

router.process(change_event)

Routing a transaction envelope

router.process(transaction_envelope)

See Also:

Instance Method Summary collapse

Constructor Details

#initialize(processor_pool:, transaction_pool:) ⇒ void

Create a router for event and transaction work items.

Parameters:

  • processor_pool (ProcessorPool)

    Pool used for individual ‘CDC::Core::ChangeEvent` objects.

  • transaction_pool (TransactionPool)

    Pool used for ‘CDC::Core::TransactionEnvelope` objects.



31
32
33
34
# File 'lib/cdc/parallel/router.rb', line 31

def initialize(processor_pool:, transaction_pool:)
  @processor_pool = processor_pool
  @transaction_pool = transaction_pool
end

Instance Method Details

#process(item) ⇒ CDC::Core::ProcessorResult

Process a supported CDC work item.

Parameters:

  • item (CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope)

    Normalized CDC work item.

Returns:

  • (CDC::Core::ProcessorResult)

Raises:



43
44
45
46
47
48
49
50
51
52
# File 'lib/cdc/parallel/router.rb', line 43

def process(item)
  case item
  when CDC::Core::ChangeEvent
    @processor_pool.process(item)
  when CDC::Core::TransactionEnvelope
    @transaction_pool.process(item)
  else
    raise UnsupportedWorkItemError, "unsupported CDC work item: #{item.class}"
  end
end