Class: CDC::Core::CompositeProcessor
- Defined in:
- lib/cdc/core/composite_processor.rb
Overview
Processor that delegates each event to multiple processors.
CompositeProcessor enables fan-out processing while preserving a simple sequential execution model. It normalizes truthy/falsey processor returns into ProcessorResult objects and can stop at the first failure.
Instance Attribute Summary collapse
- #fail_fast ⇒ Array<Processor>, ... readonly
- #observer ⇒ Array<Processor>, ... readonly
- #processors ⇒ Array<Processor>, ... readonly
Instance Method Summary collapse
-
#initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) ⇒ CompositeProcessor
constructor
Build a composite processor.
-
#process(event) ⇒ Array<ProcessorResult>
Process an event through each configured processor.
-
#ractor_safe_processors ⇒ Array<Processor>
Processors that declared Ractor safety.
-
#sequential_processors ⇒ Array<Processor>
Processors that should remain sequential in the core runtime.
Methods inherited from Processor
#flush, #healthy?, ractor_safe!, ractor_safe?, #ractor_safe?, #start, #stop
Constructor Details
#initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) ⇒ CompositeProcessor
Build a composite processor.
20 21 22 23 24 |
# File 'lib/cdc/core/composite_processor.rb', line 20 def initialize(processors, fail_fast: true, observer: NullObserver::INSTANCE) # rubocop:disable Lint/MissingSuper @processors = processors.freeze @fail_fast = fail_fast @observer = observer || NullObserver::INSTANCE end |
Instance Attribute Details
#fail_fast ⇒ Array<Processor>, ... (readonly)
14 15 16 |
# File 'lib/cdc/core/composite_processor.rb', line 14 def fail_fast @fail_fast end |
#observer ⇒ Array<Processor>, ... (readonly)
14 15 16 |
# File 'lib/cdc/core/composite_processor.rb', line 14 def observer @observer end |
#processors ⇒ Array<Processor>, ... (readonly)
14 15 16 |
# File 'lib/cdc/core/composite_processor.rb', line 14 def processors @processors end |
Instance Method Details
#process(event) ⇒ Array<ProcessorResult>
Process an event through each configured processor.
30 31 32 33 34 35 36 37 |
# File 'lib/cdc/core/composite_processor.rb', line 30 def process(event) observer.dispatch_started(event) results = collect_results(event) final_results = results.freeze observe_results(final_results) Ractor.make_shareable(final_results) if results.none?(&:failure?) final_results end |
#ractor_safe_processors ⇒ Array<Processor>
Processors that declared Ractor safety.
42 43 44 |
# File 'lib/cdc/core/composite_processor.rb', line 42 def ractor_safe_processors processors.select(&:ractor_safe?).freeze end |
#sequential_processors ⇒ Array<Processor>
Processors that should remain sequential in the core runtime.
49 50 51 |
# File 'lib/cdc/core/composite_processor.rb', line 49 def sequential_processors processors.reject(&:ractor_safe?).freeze end |