Class: CDC::Core::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/core/pipeline.rb

Overview

Connects filters with a processor to form an event-processing unit.

A Pipeline first evaluates all filters. Matching events are handed to the processor, while filtered events produce skipped results. Processor errors are captured as failure results instead of escaping to the caller.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(processor:, filters: []) ⇒ Pipeline

Build a pipeline.

Parameters:

  • processor (#process)

    processor for matching events

  • filters (Array<Filter>) (defaults to: [])

    filters applied before processing



19
20
21
22
# File 'lib/cdc/core/pipeline.rb', line 19

def initialize(processor:, filters: [])
  @processor = processor
  @filters = filters.freeze
end

Instance Attribute Details

#filters#process, Array<Filter> (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing



13
14
15
# File 'lib/cdc/core/pipeline.rb', line 13

def filters
  @filters
end

#processor#process, Array<Filter> (readonly)

Returns:

  • (#process)

    processor invoked for matching events

  • (Array<Filter>)

    filters that must all match before processing



13
14
15
# File 'lib/cdc/core/pipeline.rb', line 13

def processor
  @processor
end

Instance Method Details

#process(event) ⇒ ProcessorResult

Process one event through the pipeline.

Parameters:

Returns:



28
29
30
31
32
33
34
# File 'lib/cdc/core/pipeline.rb', line 28

def process(event)
  return ProcessorResult.skipped(event, metadata: { reason: 'filtered' }) unless matches?(event)

  normalize_result(processor.process(event), event)
rescue StandardError => e
  ProcessorResult.failure(e, event:)
end

#process_many(events) ⇒ Array<ProcessorResult>

Process many events in order.

Parameters:

  • events (Enumerable<ChangeEvent>)

    events to process

Returns:



40
41
42
# File 'lib/cdc/core/pipeline.rb', line 40

def process_many(events)
  events.map { |event| process(event) }.freeze
end