Class: CDC::Core::Pipeline
- Inherits:
-
Object
- Object
- CDC::Core::Pipeline
- Defined in:
- lib/cdc/core/pipeline.rb
Overview
Connects filters with a processor to form an event-processing unit.
A Pipeline first evaluates all filters. Matching events are handed to the processor, while filtered events produce skipped results. Processor errors are captured as failure results instead of escaping to the caller.
Instance Attribute Summary collapse
- #filters ⇒ #process, ... readonly
- #observer ⇒ #process, ... readonly
- #processor ⇒ #process, ... readonly
Instance Method Summary collapse
-
#initialize(processor:, filters: [], observer: NullObserver::INSTANCE) ⇒ Pipeline
constructor
Build a pipeline.
-
#process(event) ⇒ ProcessorResult
Process one event through the pipeline.
-
#process_many(events) ⇒ Array<ProcessorResult>
Process many events in order.
Constructor Details
#initialize(processor:, filters: [], observer: NullObserver::INSTANCE) ⇒ Pipeline
Build a pipeline.
21 22 23 24 25 |
# File 'lib/cdc/core/pipeline.rb', line 21 def initialize(processor:, filters: [], observer: NullObserver::INSTANCE) @processor = processor @filters = filters.freeze @observer = observer || NullObserver::INSTANCE end |
Instance Attribute Details
#filters ⇒ #process, ... (readonly)
14 15 16 |
# File 'lib/cdc/core/pipeline.rb', line 14 def filters @filters end |
#observer ⇒ #process, ... (readonly)
14 15 16 |
# File 'lib/cdc/core/pipeline.rb', line 14 def observer @observer end |
#processor ⇒ #process, ... (readonly)
14 15 16 |
# File 'lib/cdc/core/pipeline.rb', line 14 def processor @processor end |
Instance Method Details
#process(event) ⇒ ProcessorResult
Process one event through the pipeline.
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/cdc/core/pipeline.rb', line 31 def process(event) observer.dispatch_started(event) return ProcessorResult.skipped(event, metadata: { reason: 'filtered' }) unless matches?(event) result = normalize_result(processor.process(event), event) observe_result(result) result rescue StandardError => e result = ProcessorResult.failure(e, event:, processor: processor.class.name) observer.dispatch_failed(result) result end |
#process_many(events) ⇒ Array<ProcessorResult>
Process many events in order.
48 49 50 |
# File 'lib/cdc/core/pipeline.rb', line 48 def process_many(events) events.map { |event| process(event) }.freeze end |