Module: Julewire::Karafka::EventSeverity

Defined in:
lib/julewire/karafka/event_severity.rb

Constant Summary collapse

FATAL_ERROR_TYPES =
%w[
  runner.call.error
  swarm.supervisor.error
  worker.process.error
].freeze
DEBUG_CONSUMER_EVENTS =
%w[
  connection.listener.fetch_loop
  statistics.emitted
  swarm.manager.before_fork
  swarm.manager.control
].freeze
ERROR_CONSUMER_EVENTS =
%w[
  swarm.manager.stopping
  swarm.manager.terminating
].freeze
DEBUG_PRODUCER_EVENTS =
%w[
  oauthbearer.token_refresh
  statistics.emitted
].freeze

Class Method Summary collapse

Class Method Details

.collection_count(value) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/julewire/karafka/event_severity.rb', line 98

def collection_count(value)
  return value[:count] if value.is_a?(Hash) && value.key?(:count)
  return value["count"] if value.is_a?(Hash) && value.key?("count")
  return value.size if value.respond_to?(:size)

  nil
end

.consumer(name, event:, payload:) ⇒ Object



30
31
32
# File 'lib/julewire/karafka/event_severity.rb', line 30

def consumer(name, event:, payload:)
  payload_severity(payload) || default_consumer_severity(name.to_s, event, payload)
end

.default_consumer_severity(name, event, payload) ⇒ Object



45
46
47
48
49
50
51
52
53
# File 'lib/julewire/karafka/event_severity.rb', line 45

def default_consumer_severity(name, event, payload)
  return error_severity(event, payload) if name == "error.occurred"
  return fetch_loop_received_severity(event, payload) if name == "connection.listener.fetch_loop.received"
  return notice_signal_severity(event, payload) if name == "process.notice_signal"
  return :error if ERROR_CONSUMER_EVENTS.include?(name)
  return :debug if DEBUG_CONSUMER_EVENTS.include?(name)

  :info
end

.default_producer_severity(name) ⇒ Object



55
56
57
58
59
60
# File 'lib/julewire/karafka/event_severity.rb', line 55

def default_producer_severity(name)
  return :error if name == "error.occurred"
  return :debug if DEBUG_PRODUCER_EVENTS.include?(name)

  :info
end

.error_severity(event, payload) ⇒ Object



62
63
64
65
# File 'lib/julewire/karafka/event_severity.rb', line 62

def error_severity(event, payload)
  type = event_value(event, payload, :type).to_s
  FATAL_ERROR_TYPES.include?(type) ? :fatal : :error
end

.event_value(event, payload, key) ⇒ Object



79
80
81
# File 'lib/julewire/karafka/event_severity.rb', line 79

def event_value(event, payload, key)
  payload_value(payload, key) || raw_event_value(event, key)
end

.fetch_loop_received_severity(event, payload) ⇒ Object



67
68
69
70
71
72
# File 'lib/julewire/karafka/event_severity.rb', line 67

def fetch_loop_received_severity(event, payload)
  messages = event_value(event, payload, :messages_buffer)
  count = collection_count(messages)

  count&.zero? ? :debug : :info
end

.notice_signal_severity(event, payload) ⇒ Object



74
75
76
77
# File 'lib/julewire/karafka/event_severity.rb', line 74

def notice_signal_severity(event, payload)
  signal = event_value(event, payload, :signal).to_s.upcase
  signal.end_with?("TTIN") ? :warn : :info
end

.payload_severity(payload) ⇒ Object



38
39
40
41
42
43
# File 'lib/julewire/karafka/event_severity.rb', line 38

def payload_severity(payload)
  value = payload_value(payload, :severity) || payload_value(payload, :level)
  Julewire::Core::Records::Severity.normalize(value) if value
rescue StandardError
  nil
end

.payload_value(payload, key) ⇒ Object



83
84
85
# File 'lib/julewire/karafka/event_severity.rb', line 83

def payload_value(payload, key)
  Core::Integration::Values::Read.value(payload, key)
end

.producer(name, payload) ⇒ Object



34
35
36
# File 'lib/julewire/karafka/event_severity.rb', line 34

def producer(name, payload)
  payload_severity(payload) || default_producer_severity(name.to_s)
end

.raw_event_value(event, key) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/julewire/karafka/event_severity.rb', line 87

def raw_event_value(event, key)
  raw = EventPayload.event_payload(event)
  return raw[key] if raw.respond_to?(:key?) && raw.key?(key)
  return raw[key.to_s] if raw.respond_to?(:key?) && raw.key?(key.to_s)
  return event[key] if event.respond_to?(:[])

  nil
rescue StandardError
  nil
end