Class: CDC::Core::CompositeProcessor
- Defined in:
- lib/cdc/core/composite_processor.rb
Overview
Processor that delegates each event to multiple processors.
CompositeProcessor enables fan-out processing while preserving a simple sequential execution model. It normalizes truthy/falsey processor returns into ProcessorResult objects and can stop at the first failure.
Instance Attribute Summary collapse
Instance Method Summary collapse
-
#initialize(processors, fail_fast: true) ⇒ CompositeProcessor
constructor
Build a composite processor.
-
#process(event) ⇒ Array<ProcessorResult>
Process an event through each configured processor.
-
#ractor_safe_processors ⇒ Array<Processor>
Processors that declared Ractor safety.
-
#sequential_processors ⇒ Array<Processor>
Processors that should remain sequential in the core runtime.
Methods inherited from Processor
ractor_safe!, ractor_safe?, #ractor_safe?
Constructor Details
#initialize(processors, fail_fast: true) ⇒ CompositeProcessor
Build a composite processor.
19 20 21 22 |
# File 'lib/cdc/core/composite_processor.rb', line 19 def initialize(processors, fail_fast: true) # rubocop:disable Lint/MissingSuper @processors = processors.freeze @fail_fast = fail_fast end |
Instance Attribute Details
#fail_fast ⇒ Array<Processor>, Boolean (readonly)
13 14 15 |
# File 'lib/cdc/core/composite_processor.rb', line 13 def fail_fast @fail_fast end |
#processors ⇒ Array<Processor>, Boolean (readonly)
13 14 15 |
# File 'lib/cdc/core/composite_processor.rb', line 13 def processors @processors end |
Instance Method Details
#process(event) ⇒ Array<ProcessorResult>
Process an event through each configured processor.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/cdc/core/composite_processor.rb', line 28 def process(event) results = [] # : Array[ProcessorResult] processors.each do |processor| result = normalize_result(processor.process(event), event) results << result break if fail_fast && result.failure? rescue StandardError => e result = ProcessorResult.failure(e, event:) results << result break if fail_fast end Ractor.make_shareable(results.freeze) if results.none?(&:failure?) results.freeze end |
#ractor_safe_processors ⇒ Array<Processor>
Processors that declared Ractor safety.
46 47 48 |
# File 'lib/cdc/core/composite_processor.rb', line 46 def ractor_safe_processors processors.select(&:ractor_safe?).freeze end |
#sequential_processors ⇒ Array<Processor>
Processors that should remain sequential in the core runtime.
53 54 55 |
# File 'lib/cdc/core/composite_processor.rb', line 53 def sequential_processors processors.reject(&:ractor_safe?).freeze end |