Class: CDC::Parallel::TransactionPool
- Inherits:
-
Object
- Object
- CDC::Parallel::TransactionPool
- Defined in:
- lib/cdc/parallel/transaction_pool.rb,
sig/cdc/parallel/transaction_pool.rbs
Overview
Processes a TransactionEnvelope as a single ordering-preserving unit.
Instance Method Summary collapse
- #initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true) ⇒ Object constructor
-
#process(transaction) ⇒ CDC::Core::ProcessorResult
Process all events inside a transaction envelope.
-
#shutdown ⇒ void
Shut down worker resources.
Constructor Details
#initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/cdc/parallel/transaction_pool.rb', line 57 def initialize( processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true ) @processor_pool = ProcessorPool.new( processor:, size:, timeout:, supervision:, max_respawns:, respawn_window:, respawn_cooldown:, manage_lifecycle: ) end |
Instance Method Details
#process(transaction) ⇒ CDC::Core::ProcessorResult
Process all events inside a transaction envelope.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/cdc/parallel/transaction_pool.rb', line 90 def process(transaction) results = @processor_pool.process_many(transaction.events).freeze failure = results.find(&:failure?) if failure error = failure.error || ProcessorExecutionError.new( original_class: "CDC::Core::ProcessorResult", original_message: "failed processor result did not include an error" ) return CDC::Core::ProcessorResult.failure(error, event: results) end CDC::Core::ProcessorResult.success(results) end |
#shutdown ⇒ void
This method returns an undefined value.
Shut down worker resources.
112 113 114 |
# File 'lib/cdc/parallel/transaction_pool.rb', line 112 def shutdown @processor_pool.shutdown end |