Class: CDC::Core::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/core/pipeline.rb

Overview

Connects filters with one processor to form a guarded processing unit.

A Pipeline evaluates all filters before invoking its processor. Matching inputs are processed, while filtered inputs produce skipped results. Use CompositeProcessor for fan-out to many processors and ProcessorChain for dependent step-by-step workflows.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(processor:, filters: [], observer: NullObserver::INSTANCE) ⇒ Pipeline

Build a pipeline.

Parameters:

  • processor (#process)

    processor for matching events

  • filters (Array<Filter>) (defaults to: [])

    filters applied before processing

  • observer (Observer, nil) (defaults to: NullObserver::INSTANCE)

    instrumentation observer



22
23
24
25
26
# File 'lib/cdc/core/pipeline.rb', line 22

def initialize(processor:, filters: [], observer: NullObserver::INSTANCE)
  @processor = processor
  @filters = filters.freeze
  @observer = observer || NullObserver::INSTANCE
end

Instance Attribute Details

#filters#process, ... (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing

  • (Observer)

    observer notified of dispatch events



15
16
17
# File 'lib/cdc/core/pipeline.rb', line 15

def filters
  @filters
end

#observer#process, ... (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing

  • (Observer)

    observer notified of dispatch events



15
16
17
# File 'lib/cdc/core/pipeline.rb', line 15

def observer
  @observer
end

#processor#process, ... (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing

  • (Observer)

    observer notified of dispatch events



15
16
17
# File 'lib/cdc/core/pipeline.rb', line 15

def processor
  @processor
end

Instance Method Details

#process(event) ⇒ ProcessorResult

Process one event through the pipeline.

Parameters:

Returns:



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/cdc/core/pipeline.rb', line 32

def process(event)
  observer.dispatch_started(event)
  return ProcessorResult.skipped(event, metadata: { reason: 'filtered' }) unless matches?(event)

  result = normalize_result(processor.process(event), event)
  observe_result(result)
  result
rescue StandardError => e
  result = ProcessorResult.failure(e, event:, processor: processor.class.name)
  observer.dispatch_failed(result)
  result
end

#process_many(events) ⇒ Array<ProcessorResult>

Process many events in order.

Parameters:

  • events (Enumerable<ChangeEvent>)

    events to process

Returns:



49
50
51
# File 'lib/cdc/core/pipeline.rb', line 49

def process_many(events)
  events.map { |event| process(event) }.freeze
end