Module: Julewire::Karafka::MessagingAttributes
- Defined in:
- lib/julewire/karafka/messaging_attributes.rb
Class Method Summary collapse
Class Method Details
.message(fields) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/julewire/karafka/messaging_attributes.rb', line 7 def (fields) Core::Fields::AttributeKeys.fields( Core::Fields::AttributeKeys::MESSAGING_SYSTEM => "kafka", Core::Fields::AttributeKeys::MESSAGING_OPERATION_NAME => "process", Core::Fields::AttributeKeys::MESSAGING_OPERATION_TYPE => "process", Core::Fields::AttributeKeys::MESSAGING_DESTINATION_NAME => fields[:topic], Core::Fields::AttributeKeys::MESSAGING_DESTINATION_PARTITION_ID => string_value(fields[:partition]), Core::Fields::AttributeKeys::MESSAGING_KAFKA_OFFSET => string_value(fields[:offset]), Core::Fields::AttributeKeys::MESSAGING_KAFKA_MESSAGE_KEY => string_value(fields[:key]) ) end |
.monitor(name, payload, role:) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/julewire/karafka/messaging_attributes.rb', line 19 def monitor(name, payload, role:) Core::Fields::AttributeKeys.fields( Core::Fields::AttributeKeys::MESSAGING_SYSTEM => "kafka", Core::Fields::AttributeKeys::MESSAGING_OPERATION_NAME => name.to_s, Core::Fields::AttributeKeys::MESSAGING_OPERATION_TYPE => operation_type(name, role: role), Core::Fields::AttributeKeys::MESSAGING_DESTINATION_NAME => payload[:topic] || payload.dig(:message, :topic), Core::Fields::AttributeKeys::MESSAGING_DESTINATION_PARTITION_ID => string_value( payload[:partition] || payload.dig(:message, :partition) ), Core::Fields::AttributeKeys::MESSAGING_BATCH_MESSAGE_COUNT => (payload), Core::Fields::AttributeKeys::MESSAGING_CONSUMER_GROUP_NAME => payload[:consumer_group], Core::Fields::AttributeKeys::MESSAGING_KAFKA_OFFSET => string_value(first_offset(payload)) ) end |