Class: CDC::Concurrent::TransactionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/concurrent/transaction_pool.rb

Overview

Processes TransactionEnvelope events as one logical unit.

TransactionPool delegates the envelope’s events to ProcessorPool while preserving the envelope-level success/failure contract. Event work may run concurrently, but the transaction result is returned as a single CDC::Core::ProcessorResult.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ void

Builds a transaction pool backed by an Async processor pool.

Parameters:

  • processor (CDC::Core::Processor)

    Processor instance that declares concurrent_safe!.

  • concurrency (Integer) (defaults to: 100)

    Maximum number of Async tasks allowed to run at once.

  • timeout (Float, nil) (defaults to: nil)

    Optional per-event processing timeout in seconds.

  • preserve_order (Boolean) (defaults to: true)

    Whether event results should preserve transaction event order.

Raises:



20
21
22
# File 'lib/cdc/concurrent/transaction_pool.rb', line 20

def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true)
  @processor_pool = ProcessorPool.new(processor:, concurrency:, timeout:, preserve_order:)
end

Instance Method Details

#process(transaction) ⇒ CDC::Core::ProcessorResult

Processes all events inside a transaction envelope.

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

    Transaction envelope whose events should be processed.

Returns:

  • (CDC::Core::ProcessorResult)

    Success result containing event results or failure result for the first failed event.Z



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/cdc/concurrent/transaction_pool.rb', line 29

def process(transaction)
  results = @processor_pool.process_many(transaction.events).freeze
  failure = results.find(&:failure?)

  if failure
    error = failure.error || Error.new("transaction event failed without an error")

    return CDC::Core::ProcessorResult.failure(error, event: results)
  end

  CDC::Core::ProcessorResult.success(results)
end

#shutdownvoid

This method returns an undefined value.

Shuts down the underlying processor pool.



45
46
47
# File 'lib/cdc/concurrent/transaction_pool.rb', line 45

def shutdown
  @processor_pool.shutdown
end