Class: Rdkafka::Consumer::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/consumer/message.rb

Overview

A message that was consumed from a topic.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(native_message) ⇒ Message

Returns a new instance of Message.

Parameters:



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rdkafka/consumer/message.rb', line 36

def initialize(native_message)
  # Set topic
  unless native_message[:rkt].null?
    @topic = Rdkafka::Bindings.rd_kafka_topic_name(native_message[:rkt])
  end
  # Set partition
  @partition = native_message[:partition]
  # Set payload
  unless native_message[:payload].null?
    @payload = native_message[:payload].read_string(native_message[:len])
  end
  # Set key
  unless native_message[:key].null?
    @key = native_message[:key].read_string(native_message[:key_len])
  end
  # Set offset
  @offset = native_message[:offset]
  # Set timestamp
  raw_timestamp = Rdkafka::Bindings.rd_kafka_message_timestamp(native_message, nil)
  @timestamp = if raw_timestamp && raw_timestamp > -1
    # Calculate seconds and microseconds
    seconds = raw_timestamp / 1000
    milliseconds = (raw_timestamp - seconds * 1000) * 1000
    Time.at(seconds, milliseconds)
  end

  @headers = Headers.from_native(native_message)
end

Instance Attribute Details

#headersHash{String => String} (readonly)

Returns message headers.

Returns:

  • (Hash{String => String})

    message headers



32
33
34
# File 'lib/rdkafka/consumer/message.rb', line 32

def headers
  @headers
end

#keyString? (readonly)

This message’s key

Returns:

  • (String, nil)


21
22
23
# File 'lib/rdkafka/consumer/message.rb', line 21

def key
  @key
end

#offsetInteger (readonly)

This message’s offset in its partition

Returns:

  • (Integer)


25
26
27
# File 'lib/rdkafka/consumer/message.rb', line 25

def offset
  @offset
end

#partitionInteger (readonly)

The partition this message was consumed from

Returns:

  • (Integer)


13
14
15
# File 'lib/rdkafka/consumer/message.rb', line 13

def partition
  @partition
end

#payloadString? (readonly)

This message’s payload

Returns:

  • (String, nil)


17
18
19
# File 'lib/rdkafka/consumer/message.rb', line 17

def payload
  @payload
end

#timestampTime? (readonly)

This message’s timestamp, if provided by the broker

Returns:

  • (Time, nil)


29
30
31
# File 'lib/rdkafka/consumer/message.rb', line 29

def timestamp
  @timestamp
end

#topicString (readonly)

The topic this message was consumed from

Returns:

  • (String)


9
10
11
# File 'lib/rdkafka/consumer/message.rb', line 9

def topic
  @topic
end

Instance Method Details

#to_sString

Human readable representation of this message.

Returns:

  • (String)


67
68
69
70
71
# File 'lib/rdkafka/consumer/message.rb', line 67

def to_s
  is_headers = @headers.empty? ? "" : ", headers #{headers.size}"

  "<Message in '#{topic}' with key '#{truncate(key)}', payload '#{truncate(payload)}', partition #{partition}, offset #{offset}, timestamp #{timestamp}#{is_headers}>"
end