Class: CDC::Core::CompositeProcessor
- Defined in:
- lib/cdc/core/composite_processor.rb
Overview
Fan-out processor that delegates the same input to multiple processors.
CompositeProcessor is for independent downstream side effects. Every configured processor receives the same input, and their results are collected independently. Use ProcessorChain when Processor B must receive Processor A’s output.
Instance Attribute Summary collapse
- #fail_fast ⇒ Array<Processor>, ... readonly
- #observer ⇒ Array<Processor>, ... readonly
- #processors ⇒ Array<Processor>, ... readonly
Instance Method Summary collapse
-
#initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) ⇒ CompositeProcessor
constructor
Build a composite processor.
-
#process(event) ⇒ Array<ProcessorResult>
Process an event through each configured processor.
-
#ractor_safe_processors ⇒ Array<Processor>
Processors that declared Ractor safety.
-
#sequential_processors ⇒ Array<Processor>
Processors that should remain sequential in the core runtime.
Methods inherited from Processor
#flush, #healthy?, ractor_safe!, ractor_safe?, #ractor_safe?, #start, #stop
Constructor Details
#initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) ⇒ CompositeProcessor
Build a composite processor.
21 22 23 24 25 |
# File 'lib/cdc/core/composite_processor.rb', line 21 def initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) # rubocop:disable Lint/MissingSuper @processors = processors.freeze @fail_fast = fail_fast @observer = observer || NullObserver::INSTANCE end |
Instance Attribute Details
#fail_fast ⇒ Array<Processor>, ... (readonly)
15 16 17 |
# File 'lib/cdc/core/composite_processor.rb', line 15 def fail_fast @fail_fast end |
#observer ⇒ Array<Processor>, ... (readonly)
15 16 17 |
# File 'lib/cdc/core/composite_processor.rb', line 15 def observer @observer end |
#processors ⇒ Array<Processor>, ... (readonly)
15 16 17 |
# File 'lib/cdc/core/composite_processor.rb', line 15 def processors @processors end |
Instance Method Details
#process(event) ⇒ Array<ProcessorResult>
Process an event through each configured processor.
31 32 33 34 35 36 37 38 |
# File 'lib/cdc/core/composite_processor.rb', line 31 def process(event) observer.dispatch_started(event) results = collect_results(event) final_results = results.freeze observe_results(final_results) Ractor.make_shareable(final_results) if results.none?(&:failure?) final_results end |
#ractor_safe_processors ⇒ Array<Processor>
Processors that declared Ractor safety.
43 44 45 |
# File 'lib/cdc/core/composite_processor.rb', line 43 def ractor_safe_processors processors.select(&:ractor_safe?).freeze end |
#sequential_processors ⇒ Array<Processor>
Processors that should remain sequential in the core runtime.
50 51 52 |
# File 'lib/cdc/core/composite_processor.rb', line 50 def sequential_processors processors.reject(&:ractor_safe?).freeze end |