Class: CDC::Core::Pipeline
- Inherits:
-
Object
- Object
- CDC::Core::Pipeline
- Defined in:
- lib/cdc/core/pipeline.rb
Overview
Connects filters with a processor to form an event-processing unit.
A Pipeline first evaluates all filters. Matching events are handed to the processor, while filtered events produce skipped results. Processor errors are captured as failure results instead of escaping to the caller.
Instance Attribute Summary collapse
- #filters ⇒ #process, Array<Filter> readonly
- #processor ⇒ #process, Array<Filter> readonly
Instance Method Summary collapse
-
#initialize(processor:, filters: []) ⇒ Pipeline
constructor
Build a pipeline.
-
#process(event) ⇒ ProcessorResult
Process one event through the pipeline.
-
#process_many(events) ⇒ Array<ProcessorResult>
Process many events in order.
Constructor Details
#initialize(processor:, filters: []) ⇒ Pipeline
Build a pipeline.
19 20 21 22 |
# File 'lib/cdc/core/pipeline.rb', line 19 def initialize(processor:, filters: []) @processor = processor @filters = filters.freeze end |
Instance Attribute Details
Instance Method Details
#process(event) ⇒ ProcessorResult
Process one event through the pipeline.
28 29 30 31 32 33 34 |
# File 'lib/cdc/core/pipeline.rb', line 28 def process(event) return ProcessorResult.skipped(event, metadata: { reason: 'filtered' }) unless matches?(event) normalize_result(processor.process(event), event) rescue StandardError => e ProcessorResult.failure(e, event:) end |
#process_many(events) ⇒ Array<ProcessorResult>
Process many events in order.
40 41 42 |
# File 'lib/cdc/core/pipeline.rb', line 40 def process_many(events) events.map { |event| process(event) }.freeze end |