Class: Julewire::Core::Destinations::Destination

Inherits:
Object
  • Object
show all
Defined in:
lib/julewire/core/destinations/destination.rb

Constant Summary collapse

COUNTER_KEYS =
%i[
  callback_error
  encode_error
  formatter_error
  formatted
  output_accepted
  output_error
  output_exception
  output_rejected
  processor_dropped
  processor_error
  received
  record_too_large
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, close_output:, encoder:, formatter:, max_record_bytes:, on_drop:, on_failure:, output:, error_backtrace_lines: Core::MAX_BACKTRACE_LINES, processors: []) ⇒ Destination

rubocop:disable Metrics/ParameterLists – Destination definitions pass normalized settings.

Raises:

  • (ArgumentError)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/julewire/core/destinations/destination.rb', line 24

def initialize( # rubocop:disable Metrics/ParameterLists -- Destination definitions pass normalized settings.
  name:,
  close_output:,
  encoder:,
  formatter:,
  max_record_bytes:,
  on_drop:,
  on_failure:,
  output:,
  error_backtrace_lines: Core::MAX_BACKTRACE_LINES,
  processors: []
)
  @name = Destinations.normalize_name(name)
  @formatter = validate_callable(formatter, name: :formatter)
  @encoder = validate_callable(encoder, name: :encoder)
  Validation.validate_byte_limit!(max_record_bytes, name: :max_record_bytes)
  @max_record_bytes = max_record_bytes
  @on_drop = validate_optional_callback(on_drop, name: :on_drop)
  @on_failure = validate_optional_callback(on_failure, name: :on_failure)
  raise ArgumentError, "destination #{@name.inspect} output is required" if output.nil?

  @output = Sink.wrap(output, close_output: close_output)
  @processor_chain = processor_chain(processors, error_backtrace_lines)
  initialize_tracking
  @write_step = build_write_step
end

Instance Attribute Details

#nameObject (readonly)



22
23
24
# File 'lib/julewire/core/destinations/destination.rb', line 22

def name
  @name
end

Instance Method Details

#after_fork!Object



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/julewire/core/destinations/destination.rb', line 74

def after_fork!
  initialize_tracking
  @output.after_fork! if @output.respond_to?(:after_fork!)
  self
rescue StandardError => e
  notify_failure(
    e,
    action: :after_fork,
    output_class: output_class_name,
    phase: :output_lifecycle
  )
  self
end

#close(timeout: nil) ⇒ Object



70
71
72
# File 'lib/julewire/core/destinations/destination.rb', line 70

def close(timeout: nil)
  call_output_lifecycle(:close, timeout: timeout)
end

#emit(record) ⇒ Object



51
52
53
54
55
56
57
# File 'lib/julewire/core/destinations/destination.rb', line 51

def emit(record)
  degradation_marker = @health.degradation_marker
  record = process_record(record)
  return unless record

  emit_processed_record(record, degradation_marker: degradation_marker)
end

#emit_processed_record(record, degradation_marker:) ⇒ Object



59
60
61
62
63
64
# File 'lib/julewire/core/destinations/destination.rb', line 59

def emit_processed_record(record, degradation_marker:)
  return unless @write_step.call(record) == :accepted

  clear_degradation_if_unchanged(degradation_marker)
  nil
end

#flush(timeout: nil) ⇒ Object



66
67
68
# File 'lib/julewire/core/destinations/destination.rb', line 66

def flush(timeout: nil)
  call_output_lifecycle(:flush, timeout: timeout)
end

#healthObject



94
95
96
97
98
99
100
101
102
# File 'lib/julewire/core/destinations/destination.rb', line 94

def health
  {
    counts: counts_health,
    last_callback_failure: @health.last_callback_failure,
    last_failure: @health.last_failure,
    last_loss: @health.last_loss,
    status: degraded? ? :degraded : :ok
  }
end

#resource_identityObject



88
89
90
91
92
# File 'lib/julewire/core/destinations/destination.rb', line 88

def resource_identity
  return @output.resource_identity if @output.respond_to?(:resource_identity)

  @output
end