Class: Rdkafka::Consumer::TopicPartitionList

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A list of topics with their partition information

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data = nil) ⇒ TopicPartitionList

Create a topic partition list.

Parameters:

  • data (Hash{String => nil,Partition}) (defaults to: nil)

    The topic and partition data or nil to create an empty list



12
13
14
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 12

def initialize(data = nil)
  @data = data || {}
end

Class Method Details

.from_native_tpl(pointer) ⇒ TopicPartitionList

Create a new topic partition list based of a native one.

Parameters:

  • pointer (FFI::Pointer)

    Optional pointer to an existing native list. Its contents will be copied.

Returns:



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 103

def self.from_native_tpl(pointer)
  # Data to be moved into the tpl
  data = {}

  # Create struct and copy its contents
  native_tpl = Rdkafka::Bindings::TopicPartitionList.new(pointer)
  native_tpl[:cnt].times do |i|
    ptr = native_tpl[:elems] + (i * Rdkafka::Bindings::TopicPartition.size)
    elem = Rdkafka::Bindings::TopicPartition.new(ptr)
    if elem[:partition] == Rdkafka::Bindings::RD_KAFKA_PARTITION_UA
      data[elem[:topic]] = nil
    else
      partitions = data[elem[:topic]] || []
      offset = if elem[:offset] == Rdkafka::Bindings::RD_KAFKA_OFFSET_INVALID
        nil
      else
        elem[:offset]
      end
      partition = Partition.new(elem[:partition], offset, elem[:err])
      partitions.push(partition)
      data[elem[:topic]] = partitions
    end
  end

  # Return the created object
  TopicPartitionList.new(data)
end

Instance Method Details

#==(other) ⇒ Boolean

Check equality with another TopicPartitionList

Parameters:

Returns:

  • (Boolean)


92
93
94
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 92

def ==(other)
  to_h == other.to_h
end

#add_topic(topic, partitions = nil) ⇒ nil

Add a topic with optionally partitions to the list. Calling this method multiple times for the same topic will overwrite the previous configuraton.

Examples:

Add a topic with unassigned partitions

tpl.add_topic("topic")

Add a topic with assigned partitions

tpl.add_topic("topic", (0..8))

Add a topic with all topics up to a count

tpl.add_topic("topic", 9)

Parameters:

  • topic (String)

    The topic’s name

  • partitions (Array<Integer>, Range<Integer>, Integer) (defaults to: nil)

    The topic’s partitions or partition count

Returns:

  • (nil)


52
53
54
55
56
57
58
59
60
61
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 52

def add_topic(topic, partitions = nil)
  if partitions.nil?
    @data[topic.to_s] = nil
  else
    if partitions.is_a? Integer
      partitions = (0..partitions - 1)
    end
    @data[topic.to_s] = partitions.map { |p| Partition.new(p, nil, Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR) }
  end
end

#add_topic_and_partitions_with_offsets(topic, partitions_with_offsets) ⇒ nil

Add a topic with partitions and offsets set to the list Calling this method multiple times for the same topic will overwrite the previous configuraton.

Parameters:

  • topic (String)

    The topic’s name

  • partitions_with_offsets (Hash{Integer => Integer}, Array<Consumer::Partition>)

    The topic’s partitions and offsets (Hash) or partitions with offsets and metadata (Array)

Returns:

  • (nil)


70
71
72
73
74
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 70

def add_topic_and_partitions_with_offsets(topic, partitions_with_offsets)
  @data[topic.to_s] = partitions_with_offsets.map do |p, o|
    p.is_a?(Partition) ? p : Partition.new(p, o)
  end
end

#countInteger

Number of items in the list

Returns:

  • (Integer)


18
19
20
21
22
23
24
25
26
27
28
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 18

def count
  i = 0
  @data.each do |_topic, partitions|
    i += if partitions
      partitions.count
    else
      1
    end
  end
  i
end

#empty?Boolean

Whether this list is empty

Returns:

  • (Boolean)


32
33
34
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 32

def empty?
  @data.empty?
end

#to_hHash{String => Array<Partition>,nil}

Return a ‘Hash` with the topics as keys and and an array of partition information as the value if present.

Returns:



79
80
81
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 79

def to_h
  @data
end

#to_native_tplFFI::Pointer

Create a native tpl with the contents of this object added.

The pointer will be cleaned by ‘rd_kafka_topic_partition_list_destroy` when GC releases it.

Returns:

  • (FFI::Pointer)


137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 137

def to_native_tpl
  tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(count)

  @data.each do |topic, partitions|
    if partitions
      partitions.each do |p|
        Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
          tpl,
          topic,
          p.partition
        )

        if p.offset
          offset = p.offset.is_a?(Time) ? p.offset.to_f * 1_000 : p.offset

          Rdkafka::Bindings.rd_kafka_topic_partition_list_set_offset(
            tpl,
            topic,
            p.partition,
            offset
          )
        end
      end
    else
      Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
        tpl,
        topic,
        Rdkafka::Bindings::RD_KAFKA_PARTITION_UA
      )
    end
  end

  tpl
end

#to_sString

Human readable representation of this list.

Returns:

  • (String)


85
86
87
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 85

def to_s
  "<TopicPartitionList: #{to_h}>"
end