Class: CDC::Concurrent::TransactionPool
- Inherits:
-
Object
- Object
- CDC::Concurrent::TransactionPool
- Defined in:
- lib/cdc/concurrent/transaction_pool.rb
Overview
Processes TransactionEnvelope events as one logical unit.
TransactionPool delegates the envelope’s events to ProcessorPool while preserving the envelope-level success/failure contract. Event work may run concurrently, but the transaction result is returned as a single CDC::Core::ProcessorResult.
Instance Method Summary collapse
-
#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ void
constructor
Builds a transaction pool backed by an Async processor pool.
-
#process(transaction) ⇒ CDC::Core::ProcessorResult
Processes all events inside a transaction envelope.
-
#shutdown ⇒ void
Shuts down the underlying processor pool.
Constructor Details
#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ void
Builds a transaction pool backed by an Async processor pool.
20 21 22 |
# File 'lib/cdc/concurrent/transaction_pool.rb', line 20 def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) @processor_pool = ProcessorPool.new(processor:, concurrency:, timeout:, preserve_order:) end |
Instance Method Details
#process(transaction) ⇒ CDC::Core::ProcessorResult
Processes all events inside a transaction envelope.
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/cdc/concurrent/transaction_pool.rb', line 29 def process(transaction) results = @processor_pool.process_many(transaction.events).freeze failure = results.find(&:failure?) if failure error = failure.error || Error.new("transaction event failed without an error") return CDC::Core::ProcessorResult.failure(error, event: results) end CDC::Core::ProcessorResult.success(results) end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down the underlying processor pool.
45 46 47 |
# File 'lib/cdc/concurrent/transaction_pool.rb', line 45 def shutdown @processor_pool.shutdown end |