Class: Rdkafka::Consumer::TopicPartitionList
- Inherits:
-
Object
- Object
- Rdkafka::Consumer::TopicPartitionList
- Defined in:
- lib/rdkafka/consumer/topic_partition_list.rb
Overview
A list of topics with their partition information
Class Method Summary collapse
-
.from_native_tpl(pointer) ⇒ TopicPartitionList
Create a new topic partition list based of a native one.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Check equality with another TopicPartitionList.
-
#add_topic(topic, partitions = nil) ⇒ nil
Add a topic with optionally partitions to the list.
-
#add_topic_and_partitions_with_offsets(topic, partitions_with_offsets) ⇒ nil
Add a topic with partitions and offsets set to the list Calling this method multiple times for the same topic will overwrite the previous configuraton.
-
#count ⇒ Integer
Number of items in the list.
-
#empty? ⇒ Boolean
Whether this list is empty.
-
#initialize(data = nil) ⇒ TopicPartitionList
constructor
Create a topic partition list.
-
#to_h ⇒ Hash{String => Array<Partition>,nil}
Return a ‘Hash` with the topics as keys and and an array of partition information as the value if present.
-
#to_native_tpl ⇒ FFI::Pointer
Create a native tpl with the contents of this object added.
-
#to_s ⇒ String
Human readable representation of this list.
Constructor Details
#initialize(data = nil) ⇒ TopicPartitionList
Create a topic partition list.
12 13 14 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 12 def initialize(data = nil) @data = data || {} end |
Class Method Details
.from_native_tpl(pointer) ⇒ TopicPartitionList
Create a new topic partition list based of a native one.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 103 def self.from_native_tpl(pointer) # Data to be moved into the tpl data = {} # Create struct and copy its contents native_tpl = Rdkafka::Bindings::TopicPartitionList.new(pointer) native_tpl[:cnt].times do |i| ptr = native_tpl[:elems] + (i * Rdkafka::Bindings::TopicPartition.size) elem = Rdkafka::Bindings::TopicPartition.new(ptr) if elem[:partition] == Rdkafka::Bindings::RD_KAFKA_PARTITION_UA data[elem[:topic]] = nil else partitions = data[elem[:topic]] || [] offset = if elem[:offset] == Rdkafka::Bindings::RD_KAFKA_OFFSET_INVALID nil else elem[:offset] end partition = Partition.new(elem[:partition], offset, elem[:err]) partitions.push(partition) data[elem[:topic]] = partitions end end # Return the created object TopicPartitionList.new(data) end |
Instance Method Details
#==(other) ⇒ Boolean
Check equality with another TopicPartitionList
92 93 94 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 92 def ==(other) to_h == other.to_h end |
#add_topic(topic, partitions = nil) ⇒ nil
Add a topic with optionally partitions to the list. Calling this method multiple times for the same topic will overwrite the previous configuraton.
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 52 def add_topic(topic, partitions = nil) if partitions.nil? @data[topic.to_s] = nil else if partitions.is_a? Integer partitions = (0..partitions - 1) end @data[topic.to_s] = partitions.map { |p| Partition.new(p, nil, Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR) } end end |
#add_topic_and_partitions_with_offsets(topic, partitions_with_offsets) ⇒ nil
Add a topic with partitions and offsets set to the list Calling this method multiple times for the same topic will overwrite the previous configuraton.
70 71 72 73 74 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 70 def add_topic_and_partitions_with_offsets(topic, partitions_with_offsets) @data[topic.to_s] = partitions_with_offsets.map do |p, o| p.is_a?(Partition) ? p : Partition.new(p, o) end end |
#count ⇒ Integer
Number of items in the list
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 18 def count i = 0 @data.each do |_topic, partitions| i += if partitions partitions.count else 1 end end i end |
#empty? ⇒ Boolean
Whether this list is empty
32 33 34 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 32 def empty? @data.empty? end |
#to_h ⇒ Hash{String => Array<Partition>,nil}
Return a ‘Hash` with the topics as keys and and an array of partition information as the value if present.
79 80 81 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 79 def to_h @data end |
#to_native_tpl ⇒ FFI::Pointer
Create a native tpl with the contents of this object added.
The pointer will be cleaned by ‘rd_kafka_topic_partition_list_destroy` when GC releases it.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 137 def to_native_tpl tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(count) @data.each do |topic, partitions| if partitions partitions.each do |p| Rdkafka::Bindings.rd_kafka_topic_partition_list_add( tpl, topic, p.partition ) if p.offset offset = p.offset.is_a?(Time) ? p.offset.to_f * 1_000 : p.offset Rdkafka::Bindings.rd_kafka_topic_partition_list_set_offset( tpl, topic, p.partition, offset ) end end else Rdkafka::Bindings.rd_kafka_topic_partition_list_add( tpl, topic, Rdkafka::Bindings::RD_KAFKA_PARTITION_UA ) end end tpl end |
#to_s ⇒ String
Human readable representation of this list.
85 86 87 |
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 85 def to_s "<TopicPartitionList: #{to_h}>" end |