Class: Legion::Transport::Kafka::IncomingMessage
- Inherits:
-
Object
- Object
- Legion::Transport::Kafka::IncomingMessage
- Includes:
- Logging::Helper
- Defined in:
- lib/legion/transport/kafka/incoming_message.rb
Overview
Wraps an rdkafka message with a clean, framework-consistent interface. Passed to subscriber blocks instead of the raw Rdkafka::Consumer::Message.
Instance Attribute Summary collapse
-
#headers ⇒ Object
readonly
Returns the value of attribute headers.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#offset ⇒ Object
readonly
Returns the value of attribute offset.
-
#partition ⇒ Object
readonly
Returns the value of attribute partition.
-
#payload ⇒ Object
readonly
Returns the raw string payload.
-
#raw ⇒ Object
readonly
Returns the value of attribute raw.
-
#timestamp ⇒ Object
readonly
Returns the value of attribute timestamp.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#decoded_payload ⇒ Object
Attempts to parse the payload as JSON; returns the raw string on failure.
-
#initialize(rdkafka_message) ⇒ IncomingMessage
constructor
A new instance of IncomingMessage.
- #inspect ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(rdkafka_message) ⇒ IncomingMessage
Returns a new instance of IncomingMessage.
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 15 def initialize() @raw = @topic = .topic @partition = .partition @offset = .offset @key = .key @headers = .headers || {} @timestamp = . @payload = .payload end |
Instance Attribute Details
#headers ⇒ Object (readonly)
Returns the value of attribute headers.
13 14 15 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13 def headers @headers end |
#key ⇒ Object (readonly)
Returns the value of attribute key.
13 14 15 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13 def key @key end |
#offset ⇒ Object (readonly)
Returns the value of attribute offset.
13 14 15 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13 def offset @offset end |
#partition ⇒ Object (readonly)
Returns the value of attribute partition.
13 14 15 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13 def partition @partition end |
#payload ⇒ Object (readonly)
Returns the raw string payload.
27 28 29 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 27 def payload @payload end |
#raw ⇒ Object (readonly)
Returns the value of attribute raw.
13 14 15 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13 def raw @raw end |
#timestamp ⇒ Object (readonly)
Returns the value of attribute timestamp.
13 14 15 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13 def @timestamp end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
13 14 15 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 13 def topic @topic end |
Instance Method Details
#decoded_payload ⇒ Object
Attempts to parse the payload as JSON; returns the raw string on failure.
30 31 32 33 34 35 36 37 38 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 30 def decoded_payload return @payload unless @payload.is_a?(String) Legion::JSON.parse(@payload) rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'transport.kafka.incoming_message.decoded_payload', topic: topic, offset: offset) @payload end |
#inspect ⇒ Object
44 45 46 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 44 def inspect "#<#{self.class} topic=#{topic} partition=#{partition} offset=#{offset}>" end |
#to_s ⇒ Object
40 41 42 |
# File 'lib/legion/transport/kafka/incoming_message.rb', line 40 def to_s "#{topic}[#{partition}]@#{offset}" end |