Class: CDC::Parallel::Runtime

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/runtime.rb

Overview

Note:

‘Runtime` is an execution facade, not a source adapter. It expects work that has already been normalized into `cdc-core` primitives.

High-level Ractor runtime facade for ‘cdc-core` processors.

‘Runtime` is the primary public entry point for applications that want to execute normalized CDC work items with `cdc-parallel`. It wires together a ProcessorPool, a TransactionPool, and a Router so callers can submit either a single `CDC::Core::ChangeEvent` or a `CDC::Core::TransactionEnvelope` through one object.

Use this class when you want the default cdc-parallel behavior:

  • validate that the processor declared ‘ractor_safe!`

  • boot a fixed set of worker Ractors

  • route events and transaction envelopes to the right pool

  • return ‘CDC::Core::ProcessorResult` objects

  • shut down all worker resources together

Examples:

Processing a change event

runtime = CDC::Parallel::Runtime.new(
  processor: AnalyticsProcessor.new,
  size: 4,
  timeout: 5
)

result = runtime.process(change_event)
result.success? #=> true

runtime.shutdown

Processing a transaction envelope

result = runtime.process_transaction(transaction)

See Also:

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void

Create a runtime with event and transaction pools.

Parameters:

  • processor (CDC::Core::Processor)

    Ractor-safe processor used for both event and transaction processing.

  • size (Integer) (defaults to: Etc.nprocessors)

    Number of worker Ractors per internal pool.

  • timeout (Numeric, nil) (defaults to: nil)

    Optional timeout in seconds for result collection and shutdown waits.

Raises:

  • (UnsafeProcessorError)

    Raised when the processor class has not declared ‘ractor_safe!`.

  • (ArgumentError)

    Raised when size or timeout is invalid.



56
57
58
59
60
61
# File 'lib/cdc/parallel/runtime.rb', line 56

def initialize(processor:, size: Etc.nprocessors, timeout: nil)
  @processor_pool = ProcessorPool.new(processor:, size:, timeout:)
  @transaction_pool = TransactionPool.new(processor:, size:, timeout:)
  @router = Router.new(processor_pool: @processor_pool, transaction_pool: @transaction_pool)
  @shutdown = false
end

Instance Method Details

#process(item) ⇒ CDC::Core::ProcessorResult

Process a supported normalized CDC work item.

Supported items are ‘CDC::Core::ChangeEvent` and `CDC::Core::TransactionEnvelope`. Unsupported objects raise UnsupportedWorkItemError from the router.

Parameters:

  • item (CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope)

    Normalized CDC work item.

Returns:

  • (CDC::Core::ProcessorResult)

Raises:



76
77
78
79
80
# File 'lib/cdc/parallel/runtime.rb', line 76

def process(item)
  raise ShutdownError, "runtime has been shut down" if @shutdown

  @router.process(item)
end

#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult

Process a transaction envelope.

This method is a readability alias for transaction-oriented call sites. It delegates to #process, so it has the same validation, shutdown, and result behavior.

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

Returns:

  • (CDC::Core::ProcessorResult)


90
91
92
# File 'lib/cdc/parallel/runtime.rb', line 90

def process_transaction(transaction)
  process(transaction)
end

#shutdownvoid

This method returns an undefined value.

Shut down all runtime resources.

Shutdown is idempotent and cascades to the internal event and transaction pools. After shutdown, #process raises ShutdownError.



100
101
102
103
104
105
106
# File 'lib/cdc/parallel/runtime.rb', line 100

def shutdown
  return if @shutdown

  @shutdown = true
  @processor_pool.shutdown
  @transaction_pool.shutdown
end