Class: CDC::Core::CompositeProcessor

Inherits:
Processor
  • Object
show all
Defined in:
lib/cdc/core/composite_processor.rb

Overview

Processor that delegates each event to multiple processors.

CompositeProcessor enables fan-out processing while preserving a simple sequential execution model. It normalizes truthy/falsey processor returns into ProcessorResult objects and can stop at the first failure.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Processor

#flush, #healthy?, ractor_safe!, ractor_safe?, #ractor_safe?, #start, #stop

Constructor Details

#initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) ⇒ CompositeProcessor

Build a composite processor.

Parameters:

  • processors (Array<#process>)

    processors to execute

  • fail_fast (Boolean) (defaults to: true)

    whether to stop after the first failure



20
21
22
23
24
# File 'lib/cdc/core/composite_processor.rb', line 20

def initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) # rubocop:disable Lint/MissingSuper
  @processors = processors.freeze
  @fail_fast = fail_fast
  @observer = observer || NullObserver::INSTANCE
end

Instance Attribute Details

#fail_fastArray<Processor>, ... (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure

  • (Observer)

    observer notified of dispatch events



14
15
16
# File 'lib/cdc/core/composite_processor.rb', line 14

def fail_fast
  @fail_fast
end

#observerArray<Processor>, ... (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure

  • (Observer)

    observer notified of dispatch events



14
15
16
# File 'lib/cdc/core/composite_processor.rb', line 14

def observer
  @observer
end

#processorsArray<Processor>, ... (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure

  • (Observer)

    observer notified of dispatch events



14
15
16
# File 'lib/cdc/core/composite_processor.rb', line 14

def processors
  @processors
end

Instance Method Details

#process(event) ⇒ Array<ProcessorResult>

Process an event through each configured processor.

Parameters:

Returns:



30
31
32
33
34
35
36
37
# File 'lib/cdc/core/composite_processor.rb', line 30

def process(event)
  observer.dispatch_started(event)
  results = collect_results(event)
  final_results = results.freeze
  observe_results(final_results)
  Ractor.make_shareable(final_results) if results.none?(&:failure?)
  final_results
end

#ractor_safe_processorsArray<Processor>

Processors that declared Ractor safety.

Returns:



42
43
44
# File 'lib/cdc/core/composite_processor.rb', line 42

def ractor_safe_processors
  processors.select(&:ractor_safe?).freeze
end

#sequential_processorsArray<Processor>

Processors that should remain sequential in the core runtime.

Returns:



49
50
51
# File 'lib/cdc/core/composite_processor.rb', line 49

def sequential_processors
  processors.reject(&:ractor_safe?).freeze
end