Class: Karafka::Admin::Topics
- Inherits:
-
Karafka::Admin
- Object
- Karafka::Admin
- Karafka::Admin::Topics
- Defined in:
- lib/karafka/admin/topics.rb
Overview
Topic administration operations Provides methods to manage Kafka topics including creation, deletion, reading, and introspection
Constant Summary
Constants inherited from Karafka::Admin
Instance Attribute Summary
Attributes inherited from Karafka::Admin
Class Method Summary collapse
- .create(name, partitions, replication_factor, topic_config = {}) ⇒ Object
- .create_partitions(name, partitions) ⇒ Object
- .delete(name) ⇒ Object
- .info(topic_name) ⇒ Object
- .read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
- .read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object
- .read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
Instance Method Summary collapse
-
#create(name, partitions, replication_factor, topic_config = {}) ⇒ void
Creates Kafka topic with given settings.
-
#create_partitions(name, partitions) ⇒ void
Creates more partitions for a given topic.
-
#delete(name) ⇒ void
Deleted a given topic.
-
#info(topic_name) ⇒ Hash
Returns basic topic metadata.
-
#read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic.
-
#read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Array<Hash>
Queries partition offsets by specification using the Kafka ListOffsets admin API.
-
#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Array<Integer, Integer>, Hash
Fetches the watermark offsets for a given topic partition or multiple topics and partitions.
Methods inherited from Karafka::Admin
#close, cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_topic, #create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, plan_topic_replication, #plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, trigger_rebalance, #trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer
Constructor Details
This class inherits a constructor from Karafka::Admin
Class Method Details
.create(name, partitions, replication_factor, topic_config = {}) ⇒ Object
26 27 28 |
# File 'lib/karafka/admin/topics.rb', line 26 def create(name, partitions, replication_factor, topic_config = {}) new.create(name, partitions, replication_factor, topic_config) end |
.create_partitions(name, partitions) ⇒ Object
39 40 41 |
# File 'lib/karafka/admin/topics.rb', line 39 def create_partitions(name, partitions) new.create_partitions(name, partitions) end |
.delete(name) ⇒ Object
32 33 34 |
# File 'lib/karafka/admin/topics.rb', line 32 def delete(name) new.delete(name) end |
.info(topic_name) ⇒ Object
52 53 54 |
# File 'lib/karafka/admin/topics.rb', line 52 def info(topic_name) new.info(topic_name) end |
.read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
16 17 18 |
# File 'lib/karafka/admin/topics.rb', line 16 def read(name, partition, count, start_offset = -1, settings = {}) new.read(name, partition, count, start_offset, settings) end |
.read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object
59 60 61 |
# File 'lib/karafka/admin/topics.rb', line 59 def read_partition_offsets(topic_partition_offsets, isolation_level: nil) new.read_partition_offsets(topic_partition_offsets, isolation_level: isolation_level) end |
.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
46 47 48 |
# File 'lib/karafka/admin/topics.rb', line 46 def read_watermark_offsets(name_or_hash, partition = nil) new.read_watermark_offsets(name_or_hash, partition) end |
Instance Method Details
#create(name, partitions, replication_factor, topic_config = {}) ⇒ void
This method returns an undefined value.
Creates Kafka topic with given settings
152 153 154 155 156 157 158 159 160 161 |
# File 'lib/karafka/admin/topics.rb', line 152 def create(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| handler = admin.create_topic(name, partitions, replication_factor, topic_config) with_re_wait( -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) }, -> { names.include?(name) } ) end end |
#create_partitions(name, partitions) ⇒ void
This method returns an undefined value.
Creates more partitions for a given topic
185 186 187 188 189 190 191 192 193 194 |
# File 'lib/karafka/admin/topics.rb', line 185 def create_partitions(name, partitions) with_admin do |admin| handler = admin.create_partitions(name, partitions) with_re_wait( -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) }, -> { info(name).fetch(:partition_count) >= partitions } ) end end |
#delete(name) ⇒ void
This method returns an undefined value.
Deleted a given topic
168 169 170 171 172 173 174 175 176 177 |
# File 'lib/karafka/admin/topics.rb', line 168 def delete(name) with_admin do |admin| handler = admin.delete_topic(name) with_re_wait( -> { handler.wait(max_wait_timeout_ms: max_wait_time_ms) }, -> { !names.include?(name) } ) end end |
#info(topic_name) ⇒ Hash
This query is much more efficient than doing a full ‘#cluster_info` + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in
Returns basic topic metadata
338 339 340 341 342 343 344 345 |
# File 'lib/karafka/admin/topics.rb', line 338 def info(topic_name) with_admin do |admin| admin .(topic_name) .topics .find { |topic| topic[:topic_name] == topic_name } end end |
#read(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/karafka/admin/topics.rb', line 75 def read(name, partition, count, start_offset = -1, settings = {}) = [] tpl = Rdkafka::Consumer::TopicPartitionList.new low_offset, high_offset = nil with_consumer(settings) do |consumer| # Convert the time offset (if needed) start_offset = resolve_offset(consumer, name.to_s, partition, start_offset) low_offset, high_offset = consumer.query_watermark_offsets(name, partition) # Select offset dynamically if -1 or less and move backwards with the negative # offset, allowing to start from N messages back from high-watermark start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative? start_offset = low_offset if start_offset.negative? # Build the requested range - since first element is on the start offset we need to # subtract one from requested count to end up with expected number of elements requested_range = (start_offset..(start_offset + count - 1)) # Establish theoretical available range. Note, that this does not handle cases related # to log retention or compaction available_range = (low_offset..(high_offset - 1)) # Select only offset that we can select. This will remove all the potential offsets # that are below the low watermark offset possible_range = requested_range.select { |offset| available_range.include?(offset) } start_offset = possible_range.first count = possible_range.size tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset) consumer.assign(tpl) # We should poll as long as we don't have all the messages that we need or as long as # we do not read all the messages from the topic loop do # If we've got as many messages as we've wanted stop break if .size >= count = consumer.poll(200) next unless # If the message we've got is beyond the requested range, stop break unless possible_range.include?(.offset) << rescue Rdkafka::RdkafkaError => e # End of partition break if e.code == :partition_eof raise e end end # Use topic from routes if we can match it or create a dummy one # Dummy one is used in case we cannot match the topic with routes. This can happen # when admin API is used to read topics that are not part of the routing topic = Karafka::Routing::Router.find_or_initialize_by_name(name) .map! do || Messages::Builders::Message.call( , topic, Time.now ) end end |
#read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Array<Hash>
Queries partition offsets by specification using the Kafka ListOffsets admin API.
This is a lower-level, more flexible alternative to ‘#read_watermark_offsets`. The key differences are:
-
Transactional correctness: ‘read_watermark_offsets` always returns the high-watermark offset, which includes messages from uncommitted or in-flight transactions. A `READ_COMMITTED` consumer will never see those messages, so lag calculated from the high-watermark is overstated on transactionally-produced topics. Passing `isolation_level: Karafka::Admin::IsolationLevels::READ_COMMITTED` here returns the Last Stable Offset (LSO) — the highest offset a `READ_COMMITTED` consumer would actually reach — giving accurate lag figures.
-
‘:max_timestamp` spec: returns the offset of the message with the highest timestamp in the partition. Not available via watermarks.
-
Leader epoch: the response includes ‘leader_epoch`, which can be used to detect stale metadata or fencing conditions.
-
Admin API: operates through the admin client rather than a consumer connection.
For non-transactional topics, ‘:latest` here and `read_watermark_offsets` high-watermark return the same value.
322 323 324 325 326 327 |
# File 'lib/karafka/admin/topics.rb', line 322 def read_partition_offsets(topic_partition_offsets, isolation_level: nil) with_admin do |admin| handle = admin.list_offsets(topic_partition_offsets, isolation_level: isolation_level) handle.wait(max_wait_timeout_ms: max_wait_time_ms).offsets end end |
#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Array<Integer, Integer>, Hash
Fetches the watermark offsets for a given topic partition or multiple topics and partitions
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/karafka/admin/topics.rb', line 222 def read_watermark_offsets(name_or_hash, partition = nil) # Normalize input to hash format topics_with_partitions = partition ? { name_or_hash => [partition] } : name_or_hash low_specs = {} high_specs = {} topics_with_partitions.each do |topic, partitions| low_specs[topic] = partitions.map { |p| { partition: p, offset: :earliest } } high_specs[topic] = partitions.map { |p| { partition: p, offset: :latest } } end lows = {} highs = {} with_admin do |admin| admin.list_offsets(low_specs).wait(max_wait_timeout_ms: max_wait_time_ms).offsets.each do |r| (lows[r[:topic]] ||= {})[r[:partition]] = r[:offset] end admin.list_offsets(high_specs).wait(max_wait_timeout_ms: max_wait_time_ms).offsets.each do |r| (highs[r[:topic]] ||= {})[r[:partition]] = r[:offset] end end result = Hash.new { |h, k| h[k] = {} } topics_with_partitions.each do |topic, partitions| partitions.each do |partition_id| result[topic][partition_id] = [lows.dig(topic.to_s, partition_id), highs.dig(topic.to_s, partition_id)] end end # Return single array for single partition query, hash for multiple partition ? result.dig(name_or_hash, partition) : result end |