Class: CDC::Parallel::TransactionPool
- Inherits:
-
Object
- Object
- CDC::Parallel::TransactionPool
- Defined in:
- lib/cdc/parallel/transaction_pool.rb
Overview
This class preserves the transaction as a result boundary. More advanced ordering, checkpointing, retry, and atomic sink semantics belong to higher-level runtime/sink contracts.
Processes a ‘CDC::Core::TransactionEnvelope` as one transaction-oriented work unit.
‘TransactionPool` uses ProcessorPool to process the events inside an envelope and then collapses the event-level results into one `CDC::Core::ProcessorResult` for the whole transaction.
This class preserves the transaction boundary at the API level: callers submit a transaction envelope and receive a single success or failure result. Event results inside the transaction are still produced by the configured processor and are returned as the success value when every event succeeds.
Instance Method Summary collapse
-
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
constructor
Create a transaction pool.
-
#process(transaction) ⇒ CDC::Core::ProcessorResult
Process all events inside a transaction envelope.
-
#shutdown ⇒ void
Shut down worker resources.
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void
Create a transaction pool.
44 45 46 |
# File 'lib/cdc/parallel/transaction_pool.rb', line 44 def initialize(processor:, size: Etc.nprocessors, timeout: nil) @processor_pool = ProcessorPool.new(processor:, size:, timeout:) end |
Instance Method Details
#process(transaction) ⇒ CDC::Core::ProcessorResult
Process all events inside a transaction envelope.
The returned result is successful only when every event result succeeds. If any event fails, the transaction result is a failure using the first failure error and the complete event-result list as context.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/cdc/parallel/transaction_pool.rb', line 59 def process(transaction) results = @processor_pool.process_many(transaction.events).freeze failure = results.find(&:failure?) if failure error = failure.error || ProcessorExecutionError.new( original_class: "CDC::Core::ProcessorResult", original_message: "failed processor result did not include an error" ) return CDC::Core::ProcessorResult.failure(error, event: results) end CDC::Core::ProcessorResult.success(results) end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down worker resources.
Delegates to the underlying ProcessorPool. Shutdown is idempotent because the underlying pool is idempotent.
81 82 83 |
# File 'lib/cdc/parallel/transaction_pool.rb', line 81 def shutdown @processor_pool.shutdown end |