Class: CDC::Core::Pipeline
- Inherits:
-
Object
- Object
- CDC::Core::Pipeline
- Defined in:
- lib/cdc/core/pipeline.rb
Overview
Connects filters with one processor to form a guarded processing unit.
A Pipeline evaluates all filters before invoking its processor. Matching inputs are processed, while filtered inputs produce skipped results. Use CompositeProcessor for fan-out to many processors and ProcessorChain for dependent step-by-step workflows.
Instance Attribute Summary collapse
- #filters ⇒ #process, ... readonly
- #observer ⇒ #process, ... readonly
- #processor ⇒ #process, ... readonly
Instance Method Summary collapse
-
#initialize(processor:, filters: [], observer: NullObserver::INSTANCE) ⇒ Pipeline
constructor
Build a pipeline.
-
#process(event) ⇒ ProcessorResult
Process one event through the pipeline.
-
#process_many(events) ⇒ Array<ProcessorResult>
Process many events in order.
Constructor Details
#initialize(processor:, filters: [], observer: NullObserver::INSTANCE) ⇒ Pipeline
Build a pipeline.
22 23 24 25 26 |
# File 'lib/cdc/core/pipeline.rb', line 22 def initialize(processor:, filters: [], observer: NullObserver::INSTANCE) @processor = processor @filters = filters.freeze @observer = observer || NullObserver::INSTANCE end |
Instance Attribute Details
#filters ⇒ #process, ... (readonly)
15 16 17 |
# File 'lib/cdc/core/pipeline.rb', line 15 def filters @filters end |
#observer ⇒ #process, ... (readonly)
15 16 17 |
# File 'lib/cdc/core/pipeline.rb', line 15 def observer @observer end |
#processor ⇒ #process, ... (readonly)
15 16 17 |
# File 'lib/cdc/core/pipeline.rb', line 15 def processor @processor end |
Instance Method Details
#process(event) ⇒ ProcessorResult
Process one event through the pipeline.
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/cdc/core/pipeline.rb', line 32 def process(event) observer.dispatch_started(event) return ProcessorResult.skipped(event, metadata: { reason: 'filtered' }) unless matches?(event) result = normalize_result(processor.process(event), event) observe_result(result) result rescue StandardError => e result = ProcessorResult.failure(e, event:, processor: processor.class.name) observer.dispatch_failed(result) result end |
#process_many(events) ⇒ Array<ProcessorResult>
Process many events in order.
49 50 51 |
# File 'lib/cdc/core/pipeline.rb', line 49 def process_many(events) events.map { |event| process(event) }.freeze end |