Class: Legion::Transport::Kafka::IncomingMessage

Inherits:
Object
  • Object
show all
Includes:
Logging::Helper
Defined in:
lib/legion/transport/kafka/incoming_message.rb

Overview

Wraps an rdkafka message with a clean, framework-consistent interface. Passed to subscriber blocks instead of the raw Rdkafka::Consumer::Message.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(rdkafka_message) ⇒ IncomingMessage

Returns a new instance of IncomingMessage.



15
16
17
18
19
20
21
22
23
24
# File 'lib/legion/transport/kafka/incoming_message.rb', line 15

def initialize(rdkafka_message)
  @raw       = rdkafka_message
  @topic     = rdkafka_message.topic
  @partition = rdkafka_message.partition
  @offset    = rdkafka_message.offset
  @key       = rdkafka_message.key
  @headers   = rdkafka_message.headers || {}
  @timestamp = rdkafka_message.timestamp
  @payload   = rdkafka_message.payload
end

Instance Attribute Details

#headersObject (readonly)

Returns the value of attribute headers.



13
14
15
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13

def headers
  @headers
end

#keyObject (readonly)

Returns the value of attribute key.



13
14
15
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13

def key
  @key
end

#offsetObject (readonly)

Returns the value of attribute offset.



13
14
15
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13

def offset
  @offset
end

#partitionObject (readonly)

Returns the value of attribute partition.



13
14
15
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13

def partition
  @partition
end

#payloadObject (readonly)

Returns the raw string payload.



27
28
29
# File 'lib/legion/transport/kafka/incoming_message.rb', line 27

def payload
  @payload
end

#rawObject (readonly)

Returns the value of attribute raw.



13
14
15
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13

def raw
  @raw
end

#timestampObject (readonly)

Returns the value of attribute timestamp.



13
14
15
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13

def timestamp
  @timestamp
end

#topicObject (readonly)

Returns the value of attribute topic.



13
14
15
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13

def topic
  @topic
end

Instance Method Details

#decoded_payloadObject

Attempts to parse the payload as JSON; returns the raw string on failure.



30
31
32
33
34
35
36
37
38
# File 'lib/legion/transport/kafka/incoming_message.rb', line 30

def decoded_payload
  return @payload unless @payload.is_a?(String)

  Legion::JSON.parse(@payload)
rescue StandardError => e
  handle_exception(e, level: :debug, handled: true, operation: 'transport.kafka.incoming_message.decoded_payload',
                   topic: topic, offset: offset)
  @payload
end

#inspectObject



44
45
46
# File 'lib/legion/transport/kafka/incoming_message.rb', line 44

def inspect
  "#<#{self.class} topic=#{topic} partition=#{partition} offset=#{offset}>"
end

#to_sObject



40
41
42
# File 'lib/legion/transport/kafka/incoming_message.rb', line 40

def to_s
  "#{topic}[#{partition}]@#{offset}"
end