Class: CDC::Core::CompositeProcessor

Inherits:
Processor
  • Object
show all
Defined in:
lib/cdc/core/composite_processor.rb

Overview

Processor that delegates each event to multiple processors.

CompositeProcessor enables fan-out processing while preserving a simple sequential execution model. It normalizes truthy/falsey processor returns into ProcessorResult objects and can stop at the first failure.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Processor

ractor_safe!, ractor_safe?, #ractor_safe?

Constructor Details

#initialize(processors, fail_fast: true) ⇒ CompositeProcessor

Build a composite processor.

Parameters:

  • processors (Array<#process>)

    processors to execute

  • fail_fast (Boolean) (defaults to: true)

    whether to stop after the first failure



19
20
21
22
# File 'lib/cdc/core/composite_processor.rb', line 19

def initialize(processors, fail_fast: true) # rubocop:disable Lint/MissingSuper
  @processors = processors.freeze
  @fail_fast = fail_fast
end

Instance Attribute Details

#fail_fastArray<Processor>, Boolean (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure



13
14
15
# File 'lib/cdc/core/composite_processor.rb', line 13

def fail_fast
  @fail_fast
end

#processorsArray<Processor>, Boolean (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure



13
14
15
# File 'lib/cdc/core/composite_processor.rb', line 13

def processors
  @processors
end

Instance Method Details

#process(event) ⇒ Array<ProcessorResult>

Process an event through each configured processor.

Parameters:

Returns:



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/cdc/core/composite_processor.rb', line 28

def process(event)
  results = [] # : Array[ProcessorResult]
  processors.each do |processor|
    result = normalize_result(processor.process(event), event)
    results << result
    break if fail_fast && result.failure?
  rescue StandardError => e
    result = ProcessorResult.failure(e, event:)
    results << result
    break if fail_fast
  end
  Ractor.make_shareable(results.freeze) if results.none?(&:failure?)
  results.freeze
end

#ractor_safe_processorsArray<Processor>

Processors that declared Ractor safety.

Returns:



46
47
48
# File 'lib/cdc/core/composite_processor.rb', line 46

def ractor_safe_processors
  processors.select(&:ractor_safe?).freeze
end

#sequential_processorsArray<Processor>

Processors that should remain sequential in the core runtime.

Returns:



53
54
55
# File 'lib/cdc/core/composite_processor.rb', line 53

def sequential_processors
  processors.reject(&:ractor_safe?).freeze
end