Module: Karafka::Admin
- Defined in:
- lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/configs/resource.rb
Overview
It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.
It always uses the primary defined cluster and does not support multi-cluster work. Cluster on which operations are performed can be changed via ‘admin.kafka` config, however there is no multi-cluster runtime support.
Admin actions that we can perform via Karafka on our Kafka cluster
Defined Under Namespace
Class Method Summary collapse
-
.cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
-
.create_partitions(name, partitions) ⇒ Object
Creates more partitions for a given topic.
-
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
Creates Kafka topic with given settings.
-
.delete_consumer_group(consumer_group_id) ⇒ Object
Removes given consumer group (if exists).
-
.delete_topic(name) ⇒ Object
Deleted a given topic.
-
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash<String, Hash<Integer, <Hash<Integer>>>>
Reads lags and offsets for given topics in the context of consumer groups defined in the routing.
-
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic.
-
.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>
Fetches the watermark offsets for a given topic partition.
-
.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
Moves the offset on a given consumer group and provided topic to the requested location.
-
.topic_info(topic_name) ⇒ Hash
Returns basic topic metadata.
-
.with_admin ⇒ Object
Creates admin instance and yields it.
-
.with_consumer(settings = {}) ⇒ Object
Creates consumer instance and yields it.
Class Method Details
.cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
412 413 414 |
# File 'lib/karafka/admin.rb', line 412 def cluster_info with_admin(&:metadata) end |
.create_partitions(name, partitions) ⇒ Object
Creates more partitions for a given topic
135 136 137 138 139 140 141 142 143 144 |
# File 'lib/karafka/admin.rb', line 135 def create_partitions(name, partitions) with_admin do |admin| handler = admin.create_partitions(name, partitions) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { topic_info(name).fetch(:partition_count) >= partitions } ) end end |
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
Creates Kafka topic with given settings
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/karafka/admin.rb', line 106 def create_topic(name, partitions, replication_factor, topic_config = {}) with_admin do |admin| handler = admin.create_topic(name, partitions, replication_factor, topic_config) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { topics_names.include?(name) } ) end end |
.delete_consumer_group(consumer_group_id) ⇒ Object
This method should not be used on a running consumer group as it will not yield any results.
Removes given consumer group (if exists)
283 284 285 286 287 288 |
# File 'lib/karafka/admin.rb', line 283 def delete_consumer_group(consumer_group_id) with_admin do |admin| handler = admin.delete_group(consumer_group_id) handler.wait(max_wait_timeout: app_config.admin.max_wait_time) end end |
.delete_topic(name) ⇒ Object
Deleted a given topic
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/karafka/admin.rb', line 120 def delete_topic(name) with_admin do |admin| handler = admin.delete_topic(name) with_re_wait( -> { handler.wait(max_wait_timeout: app_config.admin.max_wait_time) }, -> { !topics_names.include?(name) } ) end end |
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash<String, Hash<Integer, <Hash<Integer>>>>
For topics that do not exist, topic details will be set to an empty hash
For topics that exist but were never consumed by a given CG we set ‘-1` as lag and the offset on each of the partitions that were not consumed.
This lag reporting is for committed lags and is “Kafka-centric”, meaning that this represents lags from Kafka perspective and not the consumer. They may differ.
Reads lags and offsets for given topics in the context of consumer groups defined in the
routing
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/karafka/admin.rb', line 318 def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) # We first fetch all the topics with partitions count that exist in the cluster so we # do not query for topics that do not exist and so we can get partitions count for all # the topics we may need. The non-existent and not consumed will be filled at the end existing_topics = cluster_info.topics.map do |topic| [topic[:topic_name], topic[:partition_count]] end.to_h.freeze # If no expected CGs, we use all from routing that have active topics if consumer_groups_with_topics.empty? consumer_groups_with_topics = Karafka::App.routes.map do |cg| cg_topics = cg.topics.select do |cg_topic| active_topics_only ? cg_topic.active? : true end [cg.id, cg_topics.map(&:name)] end.to_h end # We make a copy because we will remove once with non-existing topics # We keep original requested consumer groups with topics for later backfilling cgs_with_topics = consumer_groups_with_topics.dup cgs_with_topics.transform_values!(&:dup) # We can query only topics that do exist, this is why we are cleaning those that do not # exist cgs_with_topics.each_value do |requested_topics| requested_topics.delete_if { |topic| !existing_topics.include?(topic) } end groups_lags = Hash.new { |h, k| h[k] = {} } groups_offs = Hash.new { |h, k| h[k] = {} } cgs_with_topics.each do |cg, topics| # Do not add to tpl topics that do not exist next if topics.empty? tpl = Rdkafka::Consumer::TopicPartitionList.new with_consumer('group.id': cg) do |consumer| topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) } commit_offsets = consumer.committed(tpl) commit_offsets.to_h.each do |topic, partitions| groups_offs[cg][topic] = {} partitions.each do |partition| # -1 when no offset is stored groups_offs[cg][topic][partition.partition] = partition.offset || -1 end end consumer.lag(commit_offsets).each do |topic, partitions_lags| groups_lags[cg][topic] = partitions_lags end end end consumer_groups_with_topics.each do |cg, topics| groups_lags[cg] topics.each do |topic| groups_lags[cg][topic] ||= {} next unless existing_topics.key?(topic) # We backfill because there is a case where our consumer group would consume for # example only one partition out of 20, rest needs to get -1 existing_topics[topic].times do |partition_id| groups_lags[cg][topic][partition_id] ||= -1 end end end merged = Hash.new { |h, k| h[k] = {} } groups_lags.each do |cg, topics| topics.each do |topic, partitions| merged[cg][topic] = {} partitions.each do |partition, lag| merged[cg][topic][partition] = { offset: groups_offs.fetch(cg).fetch(topic).fetch(partition), lag: lag } end end end merged end |
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Array<Karafka::Messages::Message>
Allows us to read messages from the topic
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/karafka/admin.rb', line 31 def read_topic(name, partition, count, start_offset = -1, settings = {}) = [] tpl = Rdkafka::Consumer::TopicPartitionList.new low_offset, high_offset = nil with_consumer(settings) do |consumer| # Convert the time offset (if needed) start_offset = resolve_offset(consumer, name.to_s, partition, start_offset) low_offset, high_offset = consumer.query_watermark_offsets(name, partition) # Select offset dynamically if -1 or less and move backwards with the negative # offset, allowing to start from N messages back from high-watermark start_offset = high_offset - count - start_offset.abs + 1 if start_offset.negative? start_offset = low_offset if start_offset.negative? # Build the requested range - since first element is on the start offset we need to # subtract one from requested count to end up with expected number of elements requested_range = (start_offset..start_offset + (count - 1)) # Establish theoretical available range. Note, that this does not handle cases related to # log retention or compaction available_range = (low_offset..(high_offset - 1)) # Select only offset that we can select. This will remove all the potential offsets that # are below the low watermark offset possible_range = requested_range.select { |offset| available_range.include?(offset) } start_offset = possible_range.first count = possible_range.count tpl.add_topic_and_partitions_with_offsets(name, partition => start_offset) consumer.assign(tpl) # We should poll as long as we don't have all the messages that we need or as long as # we do not read all the messages from the topic loop do # If we've got as many messages as we've wanted stop break if .size >= count = consumer.poll(200) next unless # If the message we've got is beyond the requested range, stop break unless possible_range.include?(.offset) << rescue Rdkafka::RdkafkaError => e # End of partition break if e.code == :partition_eof raise e end end # Use topic from routes if we can match it or create a dummy one # Dummy one is used in case we cannot match the topic with routes. This can happen # when admin API is used to read topics that are not part of the routing topic = ::Karafka::Routing::Router.find_or_initialize_by_name(name) .map! do || Messages::Builders::Message.call( , topic, Time.now ) end end |
.read_watermark_offsets(name, partition) ⇒ Array<Integer, Integer>
Fetches the watermark offsets for a given topic partition
295 296 297 298 299 |
# File 'lib/karafka/admin.rb', line 295 def read_watermark_offsets(name, partition) with_consumer do |consumer| consumer.query_watermark_offsets(name, partition) end end |
.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.
Moves the offset on a given consumer group and provided topic to the requested location
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/karafka/admin.rb', line 177 def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) tpl_base = {} # Normalize the data so we always have all partitions and topics in the same format # That is in a format where we have topics and all partitions with their per partition # assigned offsets topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets| tpl_base[topic] = {} if partitions_with_offsets.is_a?(Hash) tpl_base[topic] = partitions_with_offsets else topic_info(topic)[:partition_count].times do |partition| tpl_base[topic][partition] = partitions_with_offsets end end end tpl_base.each_value do |partitions| partitions.transform_values! do |position| # Support both symbol and string based references casted_position = position.is_a?(Symbol) ? position.to_s : position # This remap allows us to transform some special cases in a reference that can be # understood by Kafka case casted_position # Earliest is not always 0. When compacting/deleting it can be much later, that's why # we fetch the oldest possible offset when 'earliest' Time.now - HUNDRED_YEARS # Latest will always be the high-watermark offset and we can get it just by getting # a future position when 'latest' Time.now + HUNDRED_YEARS # Same as `'latest'` when false Time.now - HUNDRED_YEARS # Regular offset case else position end end end tpl = Rdkafka::Consumer::TopicPartitionList.new # In case of time based location, we need to to a pre-resolution, that's why we keep it # separately time_tpl = Rdkafka::Consumer::TopicPartitionList.new # Distribute properly the offset type tpl_base.each do |topic, partitions_with_offsets| partitions_with_offsets.each do |partition, offset| target = offset.is_a?(Time) ? time_tpl : tpl # We reverse and uniq to make sure that potentially duplicated references are removed # in such a way that the newest stays target.to_h[topic] ||= [] target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset) target.to_h[topic].reverse! target.to_h[topic].uniq!(&:partition) target.to_h[topic].reverse! end end settings = { 'group.id': consumer_group_id } with_consumer(settings) do |consumer| # If we have any time based stuff to resolve, we need to do it prior to commits unless time_tpl.empty? real_offsets = consumer.offsets_for_times(time_tpl) real_offsets.to_h.each do |name, results| results.each do |result| raise(Errors::InvalidTimeBasedOffsetError) unless result partition = result.partition # Negative offset means we're beyond last message and we need to query for the # high watermark offset to get the most recent offset and move there if result.offset.negative? _, offset = consumer.query_watermark_offsets(name, result.partition) else # If we get an offset, it means there existed a message close to this time # location offset = result.offset end # Since now we have proper offsets, we can add this to the final tpl for commit tpl.to_h[name] ||= [] tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset) tpl.to_h[name].reverse! tpl.to_h[name].uniq!(&:partition) tpl.to_h[name].reverse! end end end consumer.commit_offsets(tpl, async: false) end end |
.topic_info(topic_name) ⇒ Hash
This query is much more efficient than doing a full ‘#cluster_info` + topic lookup because it does not have to query for all the topics data but just the topic we’re interested in
Returns basic topic metadata
425 426 427 428 429 430 431 432 |
# File 'lib/karafka/admin.rb', line 425 def topic_info(topic_name) with_admin do |admin| admin .(topic_name) .topics .find { |topic| topic[:topic_name] == topic_name } end end |
.with_admin ⇒ Object
Creates admin instance and yields it. After usage it closes the admin instance
467 468 469 470 471 472 473 474 475 476 477 478 479 480 |
# File 'lib/karafka/admin.rb', line 467 def with_admin bind_id = SecureRandom.uuid admin = config(:producer, {}).admin(native_kafka_auto_start: false) bind_oauth(bind_id, admin) admin.start proxy = ::Karafka::Connection::Proxy.new(admin) yield(proxy) ensure admin&.close unbind_oauth(bind_id) end |
.with_consumer(settings = {}) ⇒ Object
We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated
Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
# File 'lib/karafka/admin.rb', line 441 def with_consumer(settings = {}) bind_id = SecureRandom.uuid consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false) bind_oauth(bind_id, consumer) consumer.start proxy = ::Karafka::Connection::Proxy.new(consumer) yield(proxy) ensure # Always unsubscribe consumer just to be sure, that no metadata requests are running # when we close the consumer. This in theory should prevent from some race-conditions # that originate from librdkafka begin consumer&.unsubscribe # Ignore any errors and continue to close consumer despite them rescue Rdkafka::RdkafkaError nil end consumer&.close unbind_oauth(bind_id) end |