Class: CDC::Parallel::TransactionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/transaction_pool.rb

Overview

Note:

This class preserves the transaction as a result boundary. More advanced ordering, checkpointing, retry, and atomic sink semantics belong to higher-level runtime/sink contracts.

Processes a ‘CDC::Core::TransactionEnvelope` as one transaction-oriented work unit.

‘TransactionPool` uses ProcessorPool to process the events inside an envelope and then collapses the event-level results into one `CDC::Core::ProcessorResult` for the whole transaction.

This class preserves the transaction boundary at the API level: callers submit a transaction envelope and receive a single success or failure result. Event results inside the transaction are still produced by the configured processor and are returned as the success value when every event succeeds.

Examples:

Processing a transaction envelope

pool = CDC::Parallel::TransactionPool.new(
  processor: AuditProcessor.new,
  size: 4
)

result = pool.process(transaction)
result.success? #=> true

See Also:

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void

Create a transaction pool.

Parameters:

  • processor (CDC::Core::Processor)

    Ractor-safe processor used for each event inside the transaction.

  • size (Integer) (defaults to: Etc.nprocessors)

    Number of worker Ractors in the underlying processor pool.

  • timeout (Numeric, nil) (defaults to: nil)

    Optional timeout in seconds for result collection and shutdown waits.

Raises:



44
45
46
# File 'lib/cdc/parallel/transaction_pool.rb', line 44

def initialize(processor:, size: Etc.nprocessors, timeout: nil)
  @processor_pool = ProcessorPool.new(processor:, size:, timeout:)
end

Instance Method Details

#process(transaction) ⇒ CDC::Core::ProcessorResult

Process all events inside a transaction envelope.

The returned result is successful only when every event result succeeds. If any event fails, the transaction result is a failure using the first failure error and the complete event-result list as context.

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

    Transaction envelope whose ‘events` will be processed.

Returns:

  • (CDC::Core::ProcessorResult)

    Success containing the ordered event results, or failure containing the first event error.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/cdc/parallel/transaction_pool.rb', line 59

def process(transaction)
  results = @processor_pool.process_many(transaction.events).freeze
  failure = results.find(&:failure?)

  if failure
    error = failure.error || ProcessorExecutionError.new(
      original_class: "CDC::Core::ProcessorResult",
      original_message: "failed processor result did not include an error"
    )

    return CDC::Core::ProcessorResult.failure(error, event: results)
  end

  CDC::Core::ProcessorResult.success(results)
end

#shutdownvoid

This method returns an undefined value.

Shut down worker resources.

Delegates to the underlying ProcessorPool. Shutdown is idempotent because the underlying pool is idempotent.



81
82
83
# File 'lib/cdc/parallel/transaction_pool.rb', line 81

def shutdown
  @processor_pool.shutdown
end