Module: Julewire::Karafka::MessagingAttributes

Defined in:
lib/julewire/karafka/messaging_attributes.rb

Class Method Summary collapse

Class Method Details

.message(fields) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/julewire/karafka/messaging_attributes.rb', line 7

def message(fields)
  Core::Fields::AttributeKeys.fields(
    Core::Fields::AttributeKeys::MESSAGING_SYSTEM => "kafka",
    Core::Fields::AttributeKeys::MESSAGING_OPERATION_NAME => "process",
    Core::Fields::AttributeKeys::MESSAGING_OPERATION_TYPE => "process",
    Core::Fields::AttributeKeys::MESSAGING_DESTINATION_NAME => fields[:topic],
    Core::Fields::AttributeKeys::MESSAGING_DESTINATION_PARTITION_ID => string_value(fields[:partition]),
    Core::Fields::AttributeKeys::MESSAGING_KAFKA_OFFSET => string_value(fields[:offset]),
    Core::Fields::AttributeKeys::MESSAGING_KAFKA_MESSAGE_KEY => string_value(fields[:key])
  )
end

.monitor(name, payload, role:) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/julewire/karafka/messaging_attributes.rb', line 19

def monitor(name, payload, role:)
  Core::Fields::AttributeKeys.fields(
    Core::Fields::AttributeKeys::MESSAGING_SYSTEM => "kafka",
    Core::Fields::AttributeKeys::MESSAGING_OPERATION_NAME => name.to_s,
    Core::Fields::AttributeKeys::MESSAGING_OPERATION_TYPE => operation_type(name, role: role),
    Core::Fields::AttributeKeys::MESSAGING_DESTINATION_NAME => payload[:topic] || payload.dig(:message, :topic),
    Core::Fields::AttributeKeys::MESSAGING_DESTINATION_PARTITION_ID => string_value(
      payload[:partition] || payload.dig(:message, :partition)
    ),
    Core::Fields::AttributeKeys::MESSAGING_BATCH_MESSAGE_COUNT => message_count(payload),
    Core::Fields::AttributeKeys::MESSAGING_CONSUMER_GROUP_NAME => payload[:consumer_group],
    Core::Fields::AttributeKeys::MESSAGING_KAFKA_OFFSET => string_value(first_offset(payload))
  )
end