Module: Julewire::Karafka::PayloadReader

Defined in:
lib/julewire/karafka/payload_reader.rb

Class Method Summary collapse

Class Method Details

.consumer_partition(consumer, metadata, first) ⇒ Object



60
61
62
# File 'lib/julewire/karafka/payload_reader.rb', line 60

def consumer_partition(consumer, , first)
  value(consumer, :partition) || value(, :partition) || value(first, :partition)
end

.consumer_payload(payload) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/julewire/karafka/payload_reader.rb', line 7

def consumer_payload(payload)
  consumer = value(payload, :caller)
  message_set = value(consumer, :messages)
   = value(message_set, :metadata)
  messages = messages_for(consumer)
  first = messages.first
  last = messages.last

  {
    consumer_class: consumer&.class&.name,
    consumer_id: value(consumer, :id),
    consumer_group: nested_value(consumer, :topic, :consumer_group, :id),
    subscription_group: nested_value(consumer, :topic, :subscription_group, :id),
    topic: consumer_topic(consumer, , first),
    partition: consumer_partition(consumer, , first),
    messages_count: count(messages, ),
    first_offset: value(, :first_offset) || value(first, :offset),
    last_offset: value(, :last_offset) || value(last, :offset),
    processing_lag_ms: value(, :processing_lag),
    consumption_lag_ms: value(, :consumption_lag)
  }.compact
end

.consumer_topic(consumer, metadata, first) ⇒ Object



56
57
58
# File 'lib/julewire/karafka/payload_reader.rb', line 56

def consumer_topic(consumer, , first)
  nested_value(consumer, :topic, :name) || value(, :topic) || value(first, :topic)
end

.count(messages, metadata) ⇒ Object



52
53
54
# File 'lib/julewire/karafka/payload_reader.rb', line 52

def count(messages, )
  value(, :size) || messages.size.nonzero?
end

.headers(message) ⇒ Object



48
49
50
# File 'lib/julewire/karafka/payload_reader.rb', line 48

def headers(message)
  value(message, :headers) || {}
end

.message_payload(message) ⇒ Object



30
31
32
33
34
35
36
37
38
# File 'lib/julewire/karafka/payload_reader.rb', line 30

def message_payload(message)
  {
    topic: value(message, :topic),
    partition: value(message, :partition),
    offset: value(message, :offset),
    key: value(message, :key),
    headers: headers(message)
  }.compact
end

.messages_for(consumer) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/julewire/karafka/payload_reader.rb', line 40

def messages_for(consumer)
  messages = value(consumer, :messages)
  return Array(messages) if messages

  message = value(consumer, :message)
  message ? [message] : []
end

.nested_value(object, *method_names) ⇒ Object



64
65
66
# File 'lib/julewire/karafka/payload_reader.rb', line 64

def nested_value(object, *method_names)
  Core::Integration::Values::Read.nested_value(object, *method_names)
end

.value(object, method_name) ⇒ Object



68
69
70
# File 'lib/julewire/karafka/payload_reader.rb', line 68

def value(object, method_name)
  Core::Integration::Values::Read.value(object, method_name)
end