Class: CDC::Core::ProcessorChain
- Defined in:
- lib/cdc/core/processor_chain.rb
Overview
Sequential processor workflow where each successful result feeds the next processor.
ProcessorChain models dependent workflows. Unlike CompositeProcessor, which sends the same input to every processor, ProcessorChain sends the value returned by one processor to the next processor. This is useful for downstream workflows such as loading records, enriching them, and then sending the enriched payload to a sink.
Each processor result is normalized into a ProcessorResult. The chain stops at the first failure or skipped result because later processors depend on the previous processor’s successful value.
Instance Attribute Summary collapse
- #observer ⇒ Array<#process>, Observer readonly
- #processors ⇒ Array<#process>, Observer readonly
Instance Method Summary collapse
-
#initialize(processors, observer: NullObserver::INSTANCE) ⇒ ProcessorChain
constructor
Build a processor chain.
-
#process(input) ⇒ ProcessorResult
Process one input through each processor in sequence.
-
#process_many(inputs) ⇒ Array<ProcessorResult>
Process many inputs in order.
Methods inherited from Processor
#flush, #healthy?, ractor_safe!, ractor_safe?, #ractor_safe?, #start, #stop
Constructor Details
#initialize(processors, observer: NullObserver::INSTANCE) ⇒ ProcessorChain
Build a processor chain.
25 26 27 28 |
# File 'lib/cdc/core/processor_chain.rb', line 25 def initialize(processors, observer: NullObserver::INSTANCE) # rubocop:disable Lint/MissingSuper @processors = processors.freeze @observer = observer || NullObserver::INSTANCE end |
Instance Attribute Details
Instance Method Details
#process(input) ⇒ ProcessorResult
Process one input through each processor in sequence.
The first processor receives the original input. Each later processor receives the previous successful ProcessorResult#value. The returned value is the final ProcessorResult produced by the chain.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/cdc/core/processor_chain.rb', line 38 def process(input) observer.dispatch_started(input) current_input = input processors.each do |processor| result = process_with(processor, current_input) observe_result(result) return result unless result.success? current_input = result.value end ProcessorResult.success(current_input, value: current_input) end |
#process_many(inputs) ⇒ Array<ProcessorResult>
Process many inputs in order.
57 58 59 |
# File 'lib/cdc/core/processor_chain.rb', line 57 def process_many(inputs) inputs.map { |input| process(input) }.freeze end |