Class: CDC::Core::CompositeProcessor

Inherits:
Processor
  • Object
show all
Defined in:
lib/cdc/core/composite_processor.rb

Overview

Fan-out processor that delegates the same input to multiple processors.

CompositeProcessor is for independent downstream side effects. Every configured processor receives the same input, and their results are collected independently. Use ProcessorChain when Processor B must receive Processor A’s output.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Processor

#flush, #healthy?, ractor_safe!, ractor_safe?, #ractor_safe?, #start, #stop

Constructor Details

#initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) ⇒ CompositeProcessor

Build a composite processor.

Parameters:

  • processors (Array<#process>)

    processors to execute

  • fail_fast (Boolean) (defaults to: true)

    whether to stop after the first failure



21
22
23
24
25
# File 'lib/cdc/core/composite_processor.rb', line 21

def initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) # rubocop:disable Lint/MissingSuper
  @processors = processors.freeze
  @fail_fast = fail_fast
  @observer = observer || NullObserver::INSTANCE
end

Instance Attribute Details

#fail_fastArray<Processor>, ... (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure

  • (Observer)

    observer notified of dispatch events



15
16
17
# File 'lib/cdc/core/composite_processor.rb', line 15

def fail_fast
  @fail_fast
end

#observerArray<Processor>, ... (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure

  • (Observer)

    observer notified of dispatch events



15
16
17
# File 'lib/cdc/core/composite_processor.rb', line 15

def observer
  @observer
end

#processorsArray<Processor>, ... (readonly)

Returns:

  • (Array<Processor>)

    processors executed for each event

  • (Boolean)

    whether processing stops on the first failure

  • (Observer)

    observer notified of dispatch events



15
16
17
# File 'lib/cdc/core/composite_processor.rb', line 15

def processors
  @processors
end

Instance Method Details

#process(event) ⇒ Array<ProcessorResult>

Process an event through each configured processor.

Parameters:

Returns:



31
32
33
34
35
36
37
38
# File 'lib/cdc/core/composite_processor.rb', line 31

def process(event)
  observer.dispatch_started(event)
  results = collect_results(event)
  final_results = results.freeze
  observe_results(final_results)
  Ractor.make_shareable(final_results) if results.none?(&:failure?)
  final_results
end

#ractor_safe_processorsArray<Processor>

Processors that declared Ractor safety.

Returns:



43
44
45
# File 'lib/cdc/core/composite_processor.rb', line 43

def ractor_safe_processors
  processors.select(&:ractor_safe?).freeze
end

#sequential_processorsArray<Processor>

Processors that should remain sequential in the core runtime.

Returns:



50
51
52
# File 'lib/cdc/core/composite_processor.rb', line 50

def sequential_processors
  processors.reject(&:ractor_safe?).freeze
end