Class: Rdkafka::Consumer::Message
- Inherits:
-
Object
- Object
- Rdkafka::Consumer::Message
- Defined in:
- lib/rdkafka/consumer/message.rb
Overview
A message that was consumed from a topic.
Instance Attribute Summary collapse
-
#headers ⇒ Hash{String => String}
readonly
Message headers.
-
#key ⇒ String?
readonly
This message’s key.
-
#offset ⇒ Integer
readonly
This message’s offset in its partition.
-
#partition ⇒ Integer
readonly
The partition this message was consumed from.
-
#payload ⇒ String?
readonly
This message’s payload.
-
#timestamp ⇒ Time?
readonly
This message’s timestamp, if provided by the broker.
-
#topic ⇒ String
readonly
The topic this message was consumed from.
Instance Method Summary collapse
-
#initialize(native_message) ⇒ Message
constructor
A new instance of Message.
-
#to_s ⇒ String
Human readable representation of this message.
Constructor Details
#initialize(native_message) ⇒ Message
Returns a new instance of Message.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rdkafka/consumer/message.rb', line 36 def initialize() # Set topic unless [:rkt].null? @topic = Rdkafka::Bindings.rd_kafka_topic_name([:rkt]) end # Set partition @partition = [:partition] # Set payload unless [:payload].null? @payload = [:payload].read_string([:len]) end # Set key unless [:key].null? @key = [:key].read_string([:key_len]) end # Set offset @offset = [:offset] # Set timestamp = Rdkafka::Bindings.(, nil) @timestamp = if && > -1 # Calculate seconds and microseconds seconds = / 1000 milliseconds = ( - seconds * 1000) * 1000 Time.at(seconds, milliseconds) end @headers = Headers.from_native() end |
Instance Attribute Details
#headers ⇒ Hash{String => String} (readonly)
Returns message headers.
32 33 34 |
# File 'lib/rdkafka/consumer/message.rb', line 32 def headers @headers end |
#key ⇒ String? (readonly)
This message’s key
21 22 23 |
# File 'lib/rdkafka/consumer/message.rb', line 21 def key @key end |
#offset ⇒ Integer (readonly)
This message’s offset in its partition
25 26 27 |
# File 'lib/rdkafka/consumer/message.rb', line 25 def offset @offset end |
#partition ⇒ Integer (readonly)
The partition this message was consumed from
13 14 15 |
# File 'lib/rdkafka/consumer/message.rb', line 13 def partition @partition end |
#payload ⇒ String? (readonly)
This message’s payload
17 18 19 |
# File 'lib/rdkafka/consumer/message.rb', line 17 def payload @payload end |
#timestamp ⇒ Time? (readonly)
This message’s timestamp, if provided by the broker
29 30 31 |
# File 'lib/rdkafka/consumer/message.rb', line 29 def @timestamp end |
#topic ⇒ String (readonly)
The topic this message was consumed from
9 10 11 |
# File 'lib/rdkafka/consumer/message.rb', line 9 def topic @topic end |
Instance Method Details
#to_s ⇒ String
Human readable representation of this message.
67 68 69 70 71 |
# File 'lib/rdkafka/consumer/message.rb', line 67 def to_s is_headers = @headers.empty? ? "" : ", headers #{headers.size}" "<Message in '#{topic}' with key '#{truncate(key)}', payload '#{truncate(payload)}', partition #{partition}, offset #{offset}, timestamp #{}#{is_headers}>" end |