Class: CDC::Core::ProcessorChain

Inherits:
Processor show all
Defined in:
lib/cdc/core/processor_chain.rb

Overview

Sequential processor workflow where each successful result feeds the next processor.

ProcessorChain models dependent workflows. Unlike CompositeProcessor, which sends the same input to every processor, ProcessorChain sends the value returned by one processor to the next processor. This is useful for downstream workflows such as loading records, enriching them, and then sending the enriched payload to a sink.

Each processor result is normalized into a ProcessorResult. The chain stops at the first failure or skipped result because later processors depend on the previous processor’s successful value.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Processor

#flush, #healthy?, ractor_safe!, ractor_safe?, #ractor_safe?, #start, #stop

Constructor Details

#initialize(processors, observer: NullObserver::INSTANCE) ⇒ ProcessorChain

Build a processor chain.

Parameters:

  • processors (Array<#process>)

    processors executed in dependency order

  • observer (Observer, nil) (defaults to: NullObserver::INSTANCE)

    instrumentation observer for each processor result



25
26
27
28
# File 'lib/cdc/core/processor_chain.rb', line 25

def initialize(processors, observer: NullObserver::INSTANCE) # rubocop:disable Lint/MissingSuper
  @processors = processors.freeze
  @observer = observer || NullObserver::INSTANCE
end

Instance Attribute Details

#observerArray<#process>, Observer (readonly)

Returns:

  • (Array<#process>)

    processors executed in dependency order

  • (Observer)

    observer notified of dispatch events



19
20
21
# File 'lib/cdc/core/processor_chain.rb', line 19

def observer
  @observer
end

#processorsArray<#process>, Observer (readonly)

Returns:

  • (Array<#process>)

    processors executed in dependency order

  • (Observer)

    observer notified of dispatch events



19
20
21
# File 'lib/cdc/core/processor_chain.rb', line 19

def processors
  @processors
end

Instance Method Details

#process(input) ⇒ ProcessorResult

Process one input through each processor in sequence.

The first processor receives the original input. Each later processor receives the previous successful ProcessorResult#value. The returned value is the final ProcessorResult produced by the chain.

Parameters:

  • input (Object)

    initial input for the first processor

Returns:

  • (ProcessorResult)

    final processor result or the first failed/skipped result



38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/cdc/core/processor_chain.rb', line 38

def process(input)
  observer.dispatch_started(input)
  current_input = input

  processors.each do |processor|
    result = process_with(processor, current_input)
    observe_result(result)
    return result unless result.success?

    current_input = result.value
  end

  ProcessorResult.success(current_input, value: current_input)
end

#process_many(inputs) ⇒ Array<ProcessorResult>

Process many inputs in order.

Parameters:

  • inputs (Enumerable<Object>)

    inputs to process through the chain

Returns:



57
58
59
# File 'lib/cdc/core/processor_chain.rb', line 57

def process_many(inputs)
  inputs.map { |input| process(input) }.freeze
end