Class: Whodunit::Chronicles::CompositeProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/whodunit/chronicles/composite_processor.rb

Overview

Fans out a single change event to multiple processors in sequence.

Use this to build pipelines — e.g. simultaneously storing audit records, streaming to Grafana, and triggering alerts — without coupling any single processor to the others.

Each processor runs independently. If one raises, the error is logged and the remaining processors still execute (fail-open by default). Set fail_fast: true to instead halt the chain on the first error.

Examples:

Basic pipeline

service = Whodunit::Chronicles::Service.new(
  adapter: adapter,
  processor: Whodunit::Chronicles::CompositeProcessor.new([
    AuditStoreProcessor.new,
    AlertingProcessor.new,
    GrafanaProcessor.new
  ])
)

Halt on first error

CompositeProcessor.new(processors, fail_fast: true)

Instance Method Summary collapse

Constructor Details

#initialize(processors, fail_fast: false, logger: nil) ⇒ CompositeProcessor

Returns a new instance of CompositeProcessor.

Parameters:

  • processors (Array<#process>)

    ordered list of processors to invoke

  • fail_fast (Boolean) (defaults to: false)

    when true, halt the chain on the first error

  • logger (Logger, nil) (defaults to: nil)

    optional logger; defaults to Chronicles logger

Raises:

  • (ArgumentError)


32
33
34
35
36
37
38
39
# File 'lib/whodunit/chronicles/composite_processor.rb', line 32

def initialize(processors, fail_fast: false, logger: nil)
  raise ArgumentError, 'processors must be an Array' unless processors.is_a?(Array)
  raise ArgumentError, 'processors cannot be empty' if processors.empty?

  @processors = processors
  @fail_fast  = fail_fast
  @logger     = logger || Whodunit::Chronicles.logger
end

Instance Method Details

#add(processor) ⇒ self Also known as: <<

Append a processor to the end of the chain.

Parameters:

  • processor (#process)

    the processor to add

Returns:

  • (self)


79
80
81
82
# File 'lib/whodunit/chronicles/composite_processor.rb', line 79

def add(processor)
  @processors << processor
  self
end

#process(change_event) ⇒ void

This method returns an undefined value.

Process a change event through every processor in the chain.

Parameters:

Raises:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/whodunit/chronicles/composite_processor.rb', line 46

def process(change_event)
  errors = []

  @processors.each do |processor|
    processor.process(change_event)
  rescue StandardError => e
    raise ProcessingError, "#{processor.class} failed: #{e.message}" if @fail_fast

    @logger.error { "CompositeProcessor: #{processor.class} raised #{e.class}: #{e.message}" }
    errors << e
  end

  return if errors.empty?

  @logger.warn do
    "CompositeProcessor: #{errors.size} processor(s) failed for #{change_event.table_name}##{change_event.action}"
  end
end

#processor_classesArray<Class>

Returns the processor classes in chain order.

Returns:

  • (Array<Class>)

    the processor classes in chain order



71
72
73
# File 'lib/whodunit/chronicles/composite_processor.rb', line 71

def processor_classes
  @processors.map(&:class)
end

#sizeInteger

Returns the number of processors in the chain.

Returns:

  • (Integer)

    the number of processors in the chain



66
67
68
# File 'lib/whodunit/chronicles/composite_processor.rb', line 66

def size
  @processors.size
end