Class: CDC::Core::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/core/pipeline.rb

Overview

Connects filters with a processor to form an event-processing unit.

A Pipeline first evaluates all filters. Matching events are handed to the processor, while filtered events produce skipped results. Processor errors are captured as failure results instead of escaping to the caller.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(processor:, filters: [], observer: NullObserver::INSTANCE) ⇒ Pipeline

Build a pipeline.

Parameters:

  • processor (#process)

    processor for matching events

  • filters (Array<Filter>) (defaults to: [])

    filters applied before processing

  • observer (Observer, nil) (defaults to: NullObserver::INSTANCE)

    instrumentation observer



21
22
23
24
25
# File 'lib/cdc/core/pipeline.rb', line 21

def initialize(processor:, filters: [], observer: NullObserver::INSTANCE)
  @processor = processor
  @filters = filters.freeze
  @observer = observer || NullObserver::INSTANCE
end

Instance Attribute Details

#filters#process, ... (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing

  • (Observer)

    observer notified of dispatch events



14
15
16
# File 'lib/cdc/core/pipeline.rb', line 14

def filters
  @filters
end

#observer#process, ... (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing

  • (Observer)

    observer notified of dispatch events



14
15
16
# File 'lib/cdc/core/pipeline.rb', line 14

def observer
  @observer
end

#processor#process, ... (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing

  • (Observer)

    observer notified of dispatch events



14
15
16
# File 'lib/cdc/core/pipeline.rb', line 14

def processor
  @processor
end

Instance Method Details

#process(event) ⇒ ProcessorResult

Process one event through the pipeline.

Parameters:

Returns:



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/cdc/core/pipeline.rb', line 31

def process(event)
  observer.dispatch_started(event)
  return ProcessorResult.skipped(event, metadata: { reason: 'filtered' }) unless matches?(event)

  result = normalize_result(processor.process(event), event)
  observe_result(result)
  result
rescue StandardError => e
  result = ProcessorResult.failure(e, event:, processor: processor.class.name)
  observer.dispatch_failed(result)
  result
end

#process_many(events) ⇒ Array<ProcessorResult>

Process many events in order.

Parameters:

  • events (Enumerable<ChangeEvent>)

    events to process

Returns:



48
49
50
# File 'lib/cdc/core/pipeline.rb', line 48

def process_many(events)
  events.map { |event| process(event) }.freeze
end