Class: CDC::Concurrent::Runtime

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/concurrent/runtime.rb

Overview

High-level concurrent runtime facade for cdc-core processors.

Runtime owns the public lifecycle for cdc-concurrent. It wires together the event processor pool, transaction pool, and router so callers can submit individual events, batches, or transaction envelopes through one object.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ void

Builds a concurrent runtime for one processor.

Parameters:

  • processor (CDC::Core::Processor)

    Processor instance that declares concurrent_safe!.

  • concurrency (Integer) (defaults to: 100)

    Maximum number of Async tasks allowed to run at once.

  • timeout (Float, nil) (defaults to: nil)

    Optional per-event processing timeout in seconds.

  • preserve_order (Boolean) (defaults to: true)

    Whether batch results should preserve input order.

Raises:



19
20
21
22
23
24
# File 'lib/cdc/concurrent/runtime.rb', line 19

def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true)
  @processor_pool = ProcessorPool.new(processor:, concurrency:, timeout:, preserve_order:)
  @transaction_pool = TransactionPool.new(processor:, concurrency:, timeout:, preserve_order:)
  @router = Router.new(processor_pool: @processor_pool, transaction_pool: @transaction_pool)
  @shutdown = false
end

Instance Method Details

#process(item) ⇒ CDC::Core::ProcessorResult+

Processes a supported work item through the runtime router.

Parameters:

  • item (CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope and Array<CDC::Core::ChangeEvent>)

    Work item to process.

Returns:

  • (CDC::Core::ProcessorResult, Array<CDC::Core::ProcessorResult>)

    Processing result for the supplied item.

Raises:



33
34
35
36
37
# File 'lib/cdc/concurrent/runtime.rb', line 33

def process(item)
  raise ShutdownError, "runtime has been shut down" if @shutdown

  @router.process(item)
end

#process_many(events) ⇒ Array<CDC::Core::ProcessorResult>

Processes a batch of change events.

Parameters:

  • events (Array<CDC::Core::ChangeEvent>)

    Events to process through the processor pool.

Returns:

  • (Array<CDC::Core::ProcessorResult>)

    Frozen array of normalized processor results.



43
44
45
# File 'lib/cdc/concurrent/runtime.rb', line 43

def process_many(events)
  process(events)
end

#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult

Processes a transaction envelope as one logical work item.

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

    Transaction whose events should be processed together.

Returns:

  • (CDC::Core::ProcessorResult)

    Success result containing event results or failure result for the first failed event.



52
53
54
# File 'lib/cdc/concurrent/runtime.rb', line 52

def process_transaction(transaction)
  process(transaction)
end

#shutdownvoid

This method returns an undefined value.

Shuts down the runtime and its underlying pools.



59
60
61
62
63
64
65
# File 'lib/cdc/concurrent/runtime.rb', line 59

def shutdown
  return if @shutdown

  @shutdown = true
  @processor_pool.shutdown
  @transaction_pool.shutdown
end