Class: CDC::Parallel::Runtime
- Inherits:
-
Object
- Object
- CDC::Parallel::Runtime
- Defined in:
- lib/cdc/parallel/runtime.rb
Overview
‘Runtime` is an execution facade, not a source adapter. It expects work that has already been normalized into `cdc-core` primitives.
High-level Ractor runtime facade for ‘cdc-core` processors.
‘Runtime` is the primary public entry point for applications that want to execute normalized CDC work items with `cdc-parallel`. It wires together a ProcessorPool, a TransactionPool, and a Router so callers can submit either a single `CDC::Core::ChangeEvent` or a `CDC::Core::TransactionEnvelope` through one object.
Use this class when you want the default cdc-parallel behavior:
-
validate that the processor declared ‘ractor_safe!`
-
boot a fixed set of worker Ractors
-
route events and transaction envelopes to the right pool
-
return ‘CDC::Core::ProcessorResult` objects
-
shut down all worker resources together
Instance Method Summary collapse
-
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
constructor
Create a runtime with event and transaction pools.
-
#process(item) ⇒ CDC::Core::ProcessorResult
Process a supported normalized CDC work item.
-
#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult
Process a transaction envelope.
-
#shutdown ⇒ void
Shut down all runtime resources.
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
Create a runtime with event and transaction pools.
56 57 58 59 60 61 |
# File 'lib/cdc/parallel/runtime.rb', line 56 def initialize(processor:, size: Etc.nprocessors, timeout: nil) @processor_pool = ProcessorPool.new(processor:, size:, timeout:) @transaction_pool = TransactionPool.new(processor:, size:, timeout:) @router = Router.new(processor_pool: @processor_pool, transaction_pool: @transaction_pool) @shutdown = false end |
Instance Method Details
#process(item) ⇒ CDC::Core::ProcessorResult
Process a supported normalized CDC work item.
Supported items are ‘CDC::Core::ChangeEvent` and `CDC::Core::TransactionEnvelope`. Unsupported objects raise UnsupportedWorkItemError from the router.
76 77 78 79 80 |
# File 'lib/cdc/parallel/runtime.rb', line 76 def process(item) raise ShutdownError, "runtime has been shut down" if @shutdown @router.process(item) end |
#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult
Process a transaction envelope.
This method is a readability alias for transaction-oriented call sites. It delegates to #process, so it has the same validation, shutdown, and result behavior.
90 91 92 |
# File 'lib/cdc/parallel/runtime.rb', line 90 def process_transaction(transaction) process(transaction) end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down all runtime resources.
Shutdown is idempotent and cascades to the internal event and transaction pools. After shutdown, #process raises ShutdownError.
100 101 102 103 104 105 106 |
# File 'lib/cdc/parallel/runtime.rb', line 100 def shutdown return if @shutdown @shutdown = true @processor_pool.shutdown @transaction_pool.shutdown end |