Class: Julewire::Core::Processing::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/julewire/core/processing/pipeline.rb

Overview

rubocop:disable Metrics/ClassLength – Pipeline owns normalization, processors, and destinations.

Constant Summary collapse

COUNTER_KEYS =
%i[
  callback_error
  entered
  level_dropped
  no_output_dropped
  processor_dropped
  processor_error
].freeze

Instance Method Summary collapse

Constructor Details

#initialize(configuration:, invalid_severity_reporter: Diagnostics::InvalidSeverityReporter.counter) ⇒ Pipeline

Integration-facing raw/normalized record pipeline.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/julewire/core/processing/pipeline.rb', line 20

def initialize(configuration:, invalid_severity_reporter: Diagnostics::InvalidSeverityReporter.counter)
  @on_drop = configuration.on_drop
  @on_failure = configuration.on_failure
  @invalid_severity_reporter = invalid_severity_reporter
  @destinations = build_destinations(configuration)
  @labels = Fields::FieldSet.deep_symbolize_keys(configuration.labels.to_h)
  @threshold = build_threshold(configuration)
  @error_backtrace_lines = configuration.error_backtrace_lines
  Validation.validate_callable!(configuration.on_failure, name: :on_failure, allow_nil: true)
  @processor_chain = build_processor_chain(configuration)
  @processors_empty = @processor_chain.empty?
  @labels_empty = @labels.empty?
  initialize_tracking
end

Instance Method Details

#after_fork!Object



68
69
70
71
72
# File 'lib/julewire/core/processing/pipeline.rb', line 68

def after_fork!
  initialize_tracking
  @destinations.after_fork!
  self
end

#close(timeout: nil, skip_resource_identities: nil) ⇒ Object



78
79
80
# File 'lib/julewire/core/processing/pipeline.rb', line 78

def close(timeout: nil, skip_resource_identities: nil)
  @destinations.close(timeout: timeout, skip_resource_identities: skip_resource_identities)
end

#emit(input = Core::UNSET, **fields) ⇒ Object



35
36
37
# File 'lib/julewire/core/processing/pipeline.rb', line 35

def emit(input = Core::UNSET, **fields, &)
  emit_with_level_check(input, true, fields, &)
end

#emit_integration(input, enforce_level: true) ⇒ Object

Trusted integration path for adapter-owned input hashes.



44
45
46
47
48
# File 'lib/julewire/core/processing/pipeline.rb', line 44

def emit_integration(input, enforce_level: true)
  emit_input_with_guard(input, enforce_level: enforce_level, lazy: false) do
    build_draft(input, input_owned: true)
  end
end

#emit_isolated_input(input, enforce_level: true) ⇒ Object

Runtime summaries already carry their captured scope fields.



51
52
53
54
55
# File 'lib/julewire/core/processing/pipeline.rb', line 51

def emit_isolated_input(input, enforce_level: true)
  emit_input_with_guard(input, enforce_level: enforce_level, lazy: false) do
    build_isolated_draft(input)
  end
end

#emit_record(record, enforce_level: true) ⇒ Object

Trusted normalized-record path used by runtime summaries and bridge envelopes.



58
59
60
61
62
63
64
65
66
# File 'lib/julewire/core/processing/pipeline.rb', line 58

def emit_record(record, enforce_level: true)
  return if no_output_dropped?

  degradation_marker = @health.degradation_marker
  emit_validated_record(record, enforce_level: enforce_level)
  finish_emit_attempt(degradation_marker)
rescue StandardError => e
  record_emit_failure(e, record)
end

#emit_without_level(input = Core::UNSET, **fields) ⇒ Object



39
40
41
# File 'lib/julewire/core/processing/pipeline.rb', line 39

def emit_without_level(input = Core::UNSET, **fields, &)
  emit_with_level_check(input, false, fields, &)
end

#flush(timeout: nil) ⇒ Object



74
75
76
# File 'lib/julewire/core/processing/pipeline.rb', line 74

def flush(timeout: nil)
  @destinations.flush(timeout: timeout)
end

#healthObject



86
87
88
89
90
91
92
93
94
95
# File 'lib/julewire/core/processing/pipeline.rb', line 86

def health
  {
    configured: !@destinations.empty?,
    counts: pipeline_counts_snapshot,
    destinations: @destinations.health,
    last_callback_failure: @health.last_callback_failure,
    last_failure: @health.last_failure,
    status: pipeline_status
  }
end

#lifecycle_resource_identitiesObject



82
83
84
# File 'lib/julewire/core/processing/pipeline.rb', line 82

def lifecycle_resource_identities
  @destinations.lifecycle_resource_identities
end