Class: Julewire::Core::Processing::Pipeline
- Inherits:
-
Object
- Object
- Julewire::Core::Processing::Pipeline
- Defined in:
- lib/julewire/core/processing/pipeline.rb
Overview
rubocop:disable Metrics/ClassLength – Pipeline owns normalization, processors, and destinations.
Constant Summary collapse
- COUNTER_KEYS =
%i[ callback_error entered level_dropped no_output_dropped processor_dropped processor_error ].freeze
Instance Method Summary collapse
- #after_fork! ⇒ Object
- #close(timeout: nil, skip_resource_identities: nil) ⇒ Object
- #emit(input = Core::UNSET, **fields) ⇒ Object
-
#emit_integration(input, enforce_level: true) ⇒ Object
Trusted integration path for adapter-owned input hashes.
-
#emit_isolated_input(input, enforce_level: true) ⇒ Object
Runtime summaries already carry their captured scope fields.
-
#emit_record(record, enforce_level: true) ⇒ Object
Trusted normalized-record path used by runtime summaries and bridge envelopes.
- #emit_without_level(input = Core::UNSET, **fields) ⇒ Object
- #flush(timeout: nil) ⇒ Object
- #health ⇒ Object
-
#initialize(configuration:, invalid_severity_reporter: Diagnostics::InvalidSeverityReporter.counter) ⇒ Pipeline
constructor
Integration-facing raw/normalized record pipeline.
- #lifecycle_resource_identities ⇒ Object
Constructor Details
#initialize(configuration:, invalid_severity_reporter: Diagnostics::InvalidSeverityReporter.counter) ⇒ Pipeline
Integration-facing raw/normalized record pipeline.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/julewire/core/processing/pipeline.rb', line 20 def initialize(configuration:, invalid_severity_reporter: Diagnostics::InvalidSeverityReporter.counter) @on_drop = configuration.on_drop @on_failure = configuration.on_failure @invalid_severity_reporter = invalid_severity_reporter @destinations = build_destinations(configuration) @labels = Fields::FieldSet.deep_symbolize_keys(configuration.labels.to_h) @threshold = build_threshold(configuration) @error_backtrace_lines = configuration.error_backtrace_lines Validation.validate_callable!(configuration.on_failure, name: :on_failure, allow_nil: true) @processor_chain = build_processor_chain(configuration) @processors_empty = @processor_chain.empty? @labels_empty = @labels.empty? initialize_tracking end |
Instance Method Details
#after_fork! ⇒ Object
68 69 70 71 72 |
# File 'lib/julewire/core/processing/pipeline.rb', line 68 def after_fork! initialize_tracking @destinations.after_fork! self end |
#close(timeout: nil, skip_resource_identities: nil) ⇒ Object
78 79 80 |
# File 'lib/julewire/core/processing/pipeline.rb', line 78 def close(timeout: nil, skip_resource_identities: nil) @destinations.close(timeout: timeout, skip_resource_identities: skip_resource_identities) end |
#emit(input = Core::UNSET, **fields) ⇒ Object
35 36 37 |
# File 'lib/julewire/core/processing/pipeline.rb', line 35 def emit(input = Core::UNSET, **fields, &) emit_with_level_check(input, true, fields, &) end |
#emit_integration(input, enforce_level: true) ⇒ Object
Trusted integration path for adapter-owned input hashes.
44 45 46 47 48 |
# File 'lib/julewire/core/processing/pipeline.rb', line 44 def emit_integration(input, enforce_level: true) emit_input_with_guard(input, enforce_level: enforce_level, lazy: false) do build_draft(input, input_owned: true) end end |
#emit_isolated_input(input, enforce_level: true) ⇒ Object
Runtime summaries already carry their captured scope fields.
51 52 53 54 55 |
# File 'lib/julewire/core/processing/pipeline.rb', line 51 def emit_isolated_input(input, enforce_level: true) emit_input_with_guard(input, enforce_level: enforce_level, lazy: false) do build_isolated_draft(input) end end |
#emit_record(record, enforce_level: true) ⇒ Object
Trusted normalized-record path used by runtime summaries and bridge envelopes.
58 59 60 61 62 63 64 65 66 |
# File 'lib/julewire/core/processing/pipeline.rb', line 58 def emit_record(record, enforce_level: true) return if no_output_dropped? degradation_marker = @health.degradation_marker emit_validated_record(record, enforce_level: enforce_level) finish_emit_attempt(degradation_marker) rescue StandardError => e record_emit_failure(e, record) end |
#emit_without_level(input = Core::UNSET, **fields) ⇒ Object
39 40 41 |
# File 'lib/julewire/core/processing/pipeline.rb', line 39 def emit_without_level(input = Core::UNSET, **fields, &) emit_with_level_check(input, false, fields, &) end |
#flush(timeout: nil) ⇒ Object
74 75 76 |
# File 'lib/julewire/core/processing/pipeline.rb', line 74 def flush(timeout: nil) @destinations.flush(timeout: timeout) end |
#health ⇒ Object
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/julewire/core/processing/pipeline.rb', line 86 def health { configured: !@destinations.empty?, counts: pipeline_counts_snapshot, destinations: @destinations.health, last_callback_failure: @health.last_callback_failure, last_failure: @health.last_failure, status: pipeline_status } end |
#lifecycle_resource_identities ⇒ Object
82 83 84 |
# File 'lib/julewire/core/processing/pipeline.rb', line 82 def lifecycle_resource_identities @destinations.lifecycle_resource_identities end |