Class: Julewire::Core::Destinations::Destination
- Inherits:
-
Object
- Object
- Julewire::Core::Destinations::Destination
- Defined in:
- lib/julewire/core/destinations/destination.rb
Constant Summary collapse
- COUNTER_KEYS =
%i[ callback_error encode_error formatter_error formatted output_accepted output_error output_exception output_rejected processor_dropped processor_error received record_too_large ].freeze
Instance Attribute Summary collapse
- #name ⇒ Object readonly
Instance Method Summary collapse
- #after_fork! ⇒ Object
- #close(timeout: nil) ⇒ Object
- #emit(record) ⇒ Object
- #emit_processed_record(record, degradation_marker:) ⇒ Object
- #flush(timeout: nil) ⇒ Object
- #health ⇒ Object
-
#initialize(name:, close_output:, encoder:, formatter:, max_record_bytes:, on_drop:, on_failure:, output:, error_backtrace_lines: Core::MAX_BACKTRACE_LINES, processors: []) ⇒ Destination
constructor
rubocop:disable Metrics/ParameterLists – Destination definitions pass normalized settings.
- #resource_identity ⇒ Object
Constructor Details
#initialize(name:, close_output:, encoder:, formatter:, max_record_bytes:, on_drop:, on_failure:, output:, error_backtrace_lines: Core::MAX_BACKTRACE_LINES, processors: []) ⇒ Destination
rubocop:disable Metrics/ParameterLists – Destination definitions pass normalized settings.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/julewire/core/destinations/destination.rb', line 24 def initialize( # rubocop:disable Metrics/ParameterLists -- Destination definitions pass normalized settings. name:, close_output:, encoder:, formatter:, max_record_bytes:, on_drop:, on_failure:, output:, error_backtrace_lines: Core::MAX_BACKTRACE_LINES, processors: [] ) @name = Destinations.normalize_name(name) @formatter = validate_callable(formatter, name: :formatter) @encoder = validate_callable(encoder, name: :encoder) Validation.validate_byte_limit!(max_record_bytes, name: :max_record_bytes) @max_record_bytes = max_record_bytes @on_drop = validate_optional_callback(on_drop, name: :on_drop) @on_failure = validate_optional_callback(on_failure, name: :on_failure) raise ArgumentError, "destination #{@name.inspect} output is required" if output.nil? @output = Sink.wrap(output, close_output: close_output) @processor_chain = processor_chain(processors, error_backtrace_lines) initialize_tracking @write_step = build_write_step end |
Instance Attribute Details
#name ⇒ Object (readonly)
22 23 24 |
# File 'lib/julewire/core/destinations/destination.rb', line 22 def name @name end |
Instance Method Details
#after_fork! ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/julewire/core/destinations/destination.rb', line 74 def after_fork! initialize_tracking @output.after_fork! if @output.respond_to?(:after_fork!) self rescue StandardError => e notify_failure( e, action: :after_fork, output_class: output_class_name, phase: :output_lifecycle ) self end |
#close(timeout: nil) ⇒ Object
70 71 72 |
# File 'lib/julewire/core/destinations/destination.rb', line 70 def close(timeout: nil) call_output_lifecycle(:close, timeout: timeout) end |
#emit(record) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/julewire/core/destinations/destination.rb', line 51 def emit(record) degradation_marker = @health.degradation_marker record = process_record(record) return unless record emit_processed_record(record, degradation_marker: degradation_marker) end |
#emit_processed_record(record, degradation_marker:) ⇒ Object
59 60 61 62 63 64 |
# File 'lib/julewire/core/destinations/destination.rb', line 59 def emit_processed_record(record, degradation_marker:) return unless @write_step.call(record) == :accepted clear_degradation_if_unchanged(degradation_marker) nil end |
#flush(timeout: nil) ⇒ Object
66 67 68 |
# File 'lib/julewire/core/destinations/destination.rb', line 66 def flush(timeout: nil) call_output_lifecycle(:flush, timeout: timeout) end |
#health ⇒ Object
94 95 96 97 98 99 100 101 102 |
# File 'lib/julewire/core/destinations/destination.rb', line 94 def health { counts: counts_health, last_callback_failure: @health.last_callback_failure, last_failure: @health.last_failure, last_loss: @health.last_loss, status: degraded? ? :degraded : :ok } end |
#resource_identity ⇒ Object
88 89 90 91 92 |
# File 'lib/julewire/core/destinations/destination.rb', line 88 def resource_identity return @output.resource_identity if @output.respond_to?(:resource_identity) @output end |