Class: CDC::Concurrent::TransactionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/concurrent/transaction_pool.rb

Overview

Processes TransactionEnvelope events as a single ordering-preserving unit.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ TransactionPool

Returns a new instance of TransactionPool.

Parameters:

  • processor (CDC::Core::Processor)
  • concurrency (Integer) (defaults to: 100)
  • timeout (Float, nil) (defaults to: nil)
  • preserve_order (Boolean) (defaults to: true)


11
12
13
# File 'lib/cdc/concurrent/transaction_pool.rb', line 11

def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true)
  @processor_pool = ProcessorPool.new(processor:, concurrency:, timeout:, preserve_order:)
end

Instance Method Details

#process(transaction) ⇒ CDC::Core::ProcessorResult

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

Returns:

  • (CDC::Core::ProcessorResult)


17
18
19
20
21
22
23
24
# File 'lib/cdc/concurrent/transaction_pool.rb', line 17

def process(transaction)
  results = @processor_pool.process_many(transaction.events).freeze
  failure = results.find(&:failure?)

  return CDC::Core::ProcessorResult.failure(failure.error, event: results) if failure

  CDC::Core::ProcessorResult.success(results)
end

#shutdownvoid

This method returns an undefined value.



27
28
29
# File 'lib/cdc/concurrent/transaction_pool.rb', line 27

def shutdown
  @processor_pool.shutdown
end