Module: Julewire::Karafka::EventPayload

Defined in:
lib/julewire/karafka/event_payload.rb

Class Method Summary collapse

Class Method Details

.call(name, event) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/julewire/karafka/event_payload.rb', line 16

def call(name, event)
  payload = event_payload(event)
  return {} unless payload.is_a?(Hash)

  payload.each_with_object(consumer_payload(name, payload)) do |(key, value), result|
    safe = safe_value(key, value)
    result[key] = safe unless safe.nil?
  end
rescue StandardError => e
  { payload_error: { exception_class: e.class.name } }
end

.consumer_batch_event?(name, payload) ⇒ Boolean

Returns:

  • (Boolean)


48
49
50
51
52
53
54
55
# File 'lib/julewire/karafka/event_payload.rb', line 48

def consumer_batch_event?(name, payload)
  event_name = name.to_s
  return true if CONSUMER_BATCH_EVENTS.include?(event_name)
  return false unless event_name == "error.occurred"

  type = PayloadReader.value(payload, :type).to_s
  type.match?(CONSUMER_ERROR_TYPE_PATTERN)
end

.consumer_payload(name, payload) ⇒ Object



42
43
44
45
46
# File 'lib/julewire/karafka/event_payload.rb', line 42

def consumer_payload(name, payload)
  return {} unless consumer_batch_event?(name, payload)

  PayloadReader.consumer_payload(payload)
end

.error(event) ⇒ Object



34
35
36
37
38
39
40
# File 'lib/julewire/karafka/event_payload.rb', line 34

def error(event)
  payload = event_payload(event)

  PayloadReader.value(payload, :error) || PayloadReader.value(payload, :exception)
rescue StandardError
  nil
end

.event_payload(event) ⇒ Object



28
29
30
31
32
# File 'lib/julewire/karafka/event_payload.rb', line 28

def event_payload(event)
  return event.payload if event.respond_to?(:payload)

  event.to_h if event.respond_to?(:to_h)
end

.primitive_value(value) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/julewire/karafka/event_payload.rb', line 72

def primitive_value(value)
  case value
  when nil, true, false, String, Symbol, Numeric
    value
  when Time
    value.utc.iso8601(9)
  when Hash
    value.transform_keys(&:to_sym)
  when Array
    { count: value.size }
  else
    { class: value.class.name }
  end
rescue StandardError
  { class: value.class.name }
end

.safe_value(key, value) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/julewire/karafka/event_payload.rb', line 57

def safe_value(key, value)
  case key.to_sym
  when :caller
    { class: value.class.name }
  when :message
    PayloadReader.message_payload(value)
  when :messages
    { count: Array(value).size }
  when :error, :exception
    nil
  else
    primitive_value(value)
  end
end