Class: CDC::Parallel::TransactionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/transaction_pool.rb,
sig/cdc/parallel/transaction_pool.rbs

Overview

Processes a TransactionEnvelope as a single ordering-preserving unit.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil, supervision: true, max_respawns: 3, respawn_window: 60, respawn_cooldown: 5, manage_lifecycle: true) ⇒ Object

Parameters:

  • processor (CDC::Core::Processor)
  • size (Integer) (defaults to: Etc.nprocessors)
  • timeout (Numeric, nil) (defaults to: nil)
  • manage_lifecycle (Boolean) (defaults to: true)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/cdc/parallel/transaction_pool.rb', line 57

def initialize(
  processor:,
  size: Etc.nprocessors,
  timeout: nil,
  supervision: true,
  max_respawns: 3,
  respawn_window: 60,
  respawn_cooldown: 5,
  manage_lifecycle: true
)
  @processor_pool = ProcessorPool.new(
    processor:,
    size:,
    timeout:,
    supervision:,
    max_respawns:,
    respawn_window:,
    respawn_cooldown:,
    manage_lifecycle:
  )
end

Instance Method Details

#process(transaction) ⇒ CDC::Core::ProcessorResult

Process all events inside a transaction envelope.

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

Returns:

  • (CDC::Core::ProcessorResult)


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/cdc/parallel/transaction_pool.rb', line 90

def process(transaction)
  results = @processor_pool.process_many(transaction.events).freeze
  failure = results.find(&:failure?)

  if failure
    error = failure.error || ProcessorExecutionError.new(
      original_class: "CDC::Core::ProcessorResult",
      original_message: "failed processor result did not include an error"
    )

    return CDC::Core::ProcessorResult.failure(error, event: results)
  end

  CDC::Core::ProcessorResult.success(results)
end

#shutdownvoid

This method returns an undefined value.

Shut down worker resources.



112
113
114
# File 'lib/cdc/parallel/transaction_pool.rb', line 112

def shutdown
  @processor_pool.shutdown
end