Module: Julewire::Karafka::PayloadReader
- Defined in:
- lib/julewire/karafka/payload_reader.rb
Class Method Summary collapse
- .consumer_partition(consumer, metadata, first) ⇒ Object
- .consumer_payload(payload) ⇒ Object
- .consumer_topic(consumer, metadata, first) ⇒ Object
- .count(messages, metadata) ⇒ Object
- .headers(message) ⇒ Object
- .message_payload(message) ⇒ Object
- .messages_for(consumer) ⇒ Object
- .nested_value(object, *method_names) ⇒ Object
- .value(object, method_name) ⇒ Object
Class Method Details
.consumer_partition(consumer, metadata, first) ⇒ Object
60 61 62 |
# File 'lib/julewire/karafka/payload_reader.rb', line 60 def consumer_partition(consumer, , first) value(consumer, :partition) || value(, :partition) || value(first, :partition) end |
.consumer_payload(payload) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/julewire/karafka/payload_reader.rb', line 7 def consumer_payload(payload) consumer = value(payload, :caller) = value(consumer, :messages) = value(, :metadata) = (consumer) first = .first last = .last { consumer_class: consumer&.class&.name, consumer_id: value(consumer, :id), consumer_group: nested_value(consumer, :topic, :consumer_group, :id), subscription_group: nested_value(consumer, :topic, :subscription_group, :id), topic: consumer_topic(consumer, , first), partition: consumer_partition(consumer, , first), messages_count: count(, ), first_offset: value(, :first_offset) || value(first, :offset), last_offset: value(, :last_offset) || value(last, :offset), processing_lag_ms: value(, :processing_lag), consumption_lag_ms: value(, :consumption_lag) }.compact end |
.consumer_topic(consumer, metadata, first) ⇒ Object
56 57 58 |
# File 'lib/julewire/karafka/payload_reader.rb', line 56 def consumer_topic(consumer, , first) nested_value(consumer, :topic, :name) || value(, :topic) || value(first, :topic) end |
.count(messages, metadata) ⇒ Object
52 53 54 |
# File 'lib/julewire/karafka/payload_reader.rb', line 52 def count(, ) value(, :size) || .size.nonzero? end |
.headers(message) ⇒ Object
48 49 50 |
# File 'lib/julewire/karafka/payload_reader.rb', line 48 def headers() value(, :headers) || {} end |
.message_payload(message) ⇒ Object
30 31 32 33 34 35 36 37 38 |
# File 'lib/julewire/karafka/payload_reader.rb', line 30 def () { topic: value(, :topic), partition: value(, :partition), offset: value(, :offset), key: value(, :key), headers: headers() }.compact end |
.messages_for(consumer) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/julewire/karafka/payload_reader.rb', line 40 def (consumer) = value(consumer, :messages) return Array() if = value(consumer, :message) ? [] : [] end |
.nested_value(object, *method_names) ⇒ Object
64 65 66 |
# File 'lib/julewire/karafka/payload_reader.rb', line 64 def nested_value(object, *method_names) Core::Integration::Values::Read.nested_value(object, *method_names) end |
.value(object, method_name) ⇒ Object
68 69 70 |
# File 'lib/julewire/karafka/payload_reader.rb', line 68 def value(object, method_name) Core::Integration::Values::Read.value(object, method_name) end |