Class: Karafka::Admin::ConsumerGroups
- Inherits:
-
Karafka::Admin
- Object
- Karafka::Admin
- Karafka::Admin::ConsumerGroups
- Defined in:
- lib/karafka/admin/consumer_groups.rb
Overview
Consumer group administration operations Provides methods to manage Kafka consumer groups including offset management, migration, and introspection
Constant Summary
Constants inherited from Karafka::Admin
Instance Attribute Summary
Attributes inherited from Karafka::Admin
Class Method Summary collapse
- .copy(previous_name, new_name, topics) ⇒ Object
- .delete(consumer_group_id) ⇒ Object
- .read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Object
- .rename(previous_name, new_name, topics, delete_previous: true) ⇒ Object
- .seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
- .trigger_rebalance(consumer_group_id) ⇒ Object
Instance Method Summary collapse
-
#copy(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group.
-
#delete(consumer_group_id) ⇒ void
Removes given consumer group (if exists).
-
#read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of consumer groups defined in the routing.
-
#rename(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group.
-
#seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ void
Moves the offset on a given consumer group and provided topic to the requested location.
-
#trigger_rebalance(consumer_group_id) ⇒ void
Triggers a rebalance for the specified consumer group by briefly joining and leaving.
Methods inherited from Karafka::Admin
cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_partitions, #create_partitions, create_topic, #create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, plan_topic_replication, #plan_topic_replication, read_topic, #read_topic, read_watermark_offsets, #read_watermark_offsets, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, with_admin, #with_admin, with_consumer, #with_consumer
Constructor Details
This class inherits a constructor from Karafka::Admin
Class Method Details
.copy(previous_name, new_name, topics) ⇒ Object
32 33 34 |
# File 'lib/karafka/admin/consumer_groups.rb', line 32 def copy(previous_name, new_name, topics) new.copy(previous_name, new_name, topics) end |
.delete(consumer_group_id) ⇒ Object
47 48 49 |
# File 'lib/karafka/admin/consumer_groups.rb', line 47 def delete(consumer_group_id) new.delete(consumer_group_id) end |
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Object
61 62 63 64 65 66 |
# File 'lib/karafka/admin/consumer_groups.rb', line 61 def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) new.read_lags_with_offsets( consumer_groups_with_topics, active_topics_only: active_topics_only ) end |
.rename(previous_name, new_name, topics, delete_previous: true) ⇒ Object
41 42 43 |
# File 'lib/karafka/admin/consumer_groups.rb', line 41 def rename(previous_name, new_name, topics, delete_previous: true) new.rename(previous_name, new_name, topics, delete_previous: delete_previous) end |
.seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
24 25 26 |
# File 'lib/karafka/admin/consumer_groups.rb', line 24 def seek(consumer_group_id, topics_with_partitions_and_offsets) new.seek(consumer_group_id, topics_with_partitions_and_offsets) end |
.trigger_rebalance(consumer_group_id) ⇒ Object
53 54 55 |
# File 'lib/karafka/admin/consumer_groups.rb', line 53 def trigger_rebalance(consumer_group_id) new.trigger_rebalance(consumer_group_id) end |
Instance Method Details
#copy(previous_name, new_name, topics) ⇒ Boolean
This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.
If new consumer group exists, old offsets will be added to it.
Takes consumer group and its topics and copies all the offsets to a new named group
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/karafka/admin/consumer_groups.rb', line 213 def copy(previous_name, new_name, topics) remap = Hash.new { |h, k| h[k] = {} } old_lags = read_lags_with_offsets({ previous_name => topics }) return false if old_lags.empty? return false if old_lags.values.all? { |topic_data| topic_data.values.all?(&:empty?) } read_lags_with_offsets({ previous_name => topics }) .fetch(previous_name) .each do |topic, partitions| partitions.each do |partition_id, details| offset = details[:offset] # No offset on this partition next if offset.negative? remap[topic][partition_id] = offset end end seek(new_name, remap) true end |
#delete(consumer_group_id) ⇒ void
This method should not be used on a running consumer group as it will not yield any results.
This method returns an undefined value.
Removes given consumer group (if exists)
276 277 278 279 280 281 |
# File 'lib/karafka/admin/consumer_groups.rb', line 276 def delete(consumer_group_id) with_admin do |admin| handler = admin.delete_group(consumer_group_id) handler.wait(max_wait_timeout_ms: max_wait_time_ms) end end |
#read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
For topics that do not exist, topic details will be set to an empty hash
For topics that exist but were never consumed by a given CG we set ‘-1` as lag and the offset on each of the partitions that were not consumed.
This lag reporting is for committed lags and is “Kafka-centric”, meaning that this represents lags from Kafka perspective and not the consumer. They may differ.
Reads lags and offsets for given topics in the context of consumer groups defined in the
routing
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/karafka/admin/consumer_groups.rb', line 372 def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) # We first fetch all the topics with partitions count that exist in the cluster so we # do not query for topics that do not exist and so we can get partitions count for all # the topics we may need. The non-existent and not consumed will be filled at the end existing_topics = cluster_info.topics.to_h do |topic| [topic[:topic_name], topic[:partition_count]] end.freeze # If no expected CGs, we use all from routing that have active topics if consumer_groups_with_topics.empty? consumer_groups_with_topics = Karafka::App.routes.to_h do |cg| cg_topics = cg.topics.select do |cg_topic| active_topics_only ? cg_topic.active? : true end [cg.id, cg_topics.map(&:name)] end end # We make a copy because we will remove once with non-existing topics # We keep original requested consumer groups with topics for later backfilling cgs_with_topics = consumer_groups_with_topics.dup cgs_with_topics.transform_values!(&:dup) # We can query only topics that do exist, this is why we are cleaning those that do not # exist cgs_with_topics.each_value do |requested_topics| requested_topics.delete_if { |topic| !existing_topics.include?(topic) } end groups_lags = Hash.new { |h, k| h[k] = {} } groups_offs = Hash.new { |h, k| h[k] = {} } cgs_with_topics.each do |cg, topics| # Do not add to tpl topics that do not exist next if topics.empty? tpl = Rdkafka::Consumer::TopicPartitionList.new with_consumer("group.id": cg) do |consumer| topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) } commit_offsets = consumer.committed(tpl) commit_offsets.to_h.each do |topic, partitions| groups_offs[cg][topic] = {} partitions.each do |partition| # -1 when no offset is stored groups_offs[cg][topic][partition.partition] = partition.offset || -1 end end consumer.lag(commit_offsets).each do |topic, partitions_lags| groups_lags[cg][topic] = partitions_lags end end end consumer_groups_with_topics.each do |cg, topics| groups_lags[cg] topics.each do |topic| groups_lags[cg][topic] ||= {} next unless existing_topics.key?(topic) # We backfill because there is a case where our consumer group would consume for # example only one partition out of 20, rest needs to get -1 existing_topics[topic].times do |partition_id| groups_lags[cg][topic][partition_id] ||= -1 end end end merged = Hash.new { |h, k| h[k] = {} } groups_lags.each do |cg, topics| topics.each do |topic, partitions| merged[cg][topic] = {} partitions.each do |partition, lag| merged[cg][topic][partition] = { offset: groups_offs.fetch(cg).fetch(topic).fetch(partition), lag: lag } end end end merged end |
#rename(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.
After migration unless ‘delete_previous` is set to `false`, old group will be removed.
If new consumer group exists, old offsets will be added to it.
Takes consumer group and its topics and migrates all the offsets to a new named group
257 258 259 260 261 262 263 264 265 266 |
# File 'lib/karafka/admin/consumer_groups.rb', line 257 def rename(previous_name, new_name, topics, delete_previous: true) copy_result = copy(previous_name, new_name, topics) return false unless copy_result return copy_result unless delete_previous delete(previous_name) true end |
#seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ void
This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.
This method returns an undefined value.
Moves the offset on a given consumer group and provided topic to the requested location
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/karafka/admin/consumer_groups.rb', line 102 def seek(consumer_group_id, topics_with_partitions_and_offsets) tpl_base = {} # Normalize the data so we always have all partitions and topics in the same format # That is in a format where we have topics and all partitions with their per partition # assigned offsets topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets| tpl_base[topic] = {} if partitions_with_offsets.is_a?(Hash) tpl_base[topic] = partitions_with_offsets else topic_info = Topics.new(kafka: @custom_kafka).info(topic) topic_info[:partition_count].times do |partition| tpl_base[topic][partition] = partitions_with_offsets end end end tpl_base.each_value do |partitions| partitions.transform_values! do |position| # Support both symbol and string based references casted_position = position.is_a?(Symbol) ? position.to_s : position # This remap allows us to transform some special cases in a reference that can be # understood by Kafka case casted_position # Earliest is not always 0. When compacting/deleting it can be much later, that's why # we fetch the oldest possible offset # false is treated the same as 'earliest' when "earliest", false LONG_TIME_AGO # Latest will always be the high-watermark offset and we can get it just by getting # a future position when "latest" Time.now + DAY_IN_SECONDS # Regular offset case else position end end end tpl = Rdkafka::Consumer::TopicPartitionList.new # In case of time based location, we need to to a pre-resolution, that's why we keep it # separately time_tpl = Rdkafka::Consumer::TopicPartitionList.new # Distribute properly the offset type tpl_base.each do |topic, partitions_with_offsets| partitions_with_offsets.each do |partition, offset| target = offset.is_a?(Time) ? time_tpl : tpl # We reverse and uniq to make sure that potentially duplicated references are removed # in such a way that the newest stays target.to_h[topic] ||= [] target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset) target.to_h[topic].reverse! target.to_h[topic].uniq!(&:partition) target.to_h[topic].reverse! end end settings = { "group.id": consumer_group_id } with_consumer(settings) do |consumer| # If we have any time based stuff to resolve, we need to do it prior to commits unless time_tpl.empty? real_offsets = consumer.offsets_for_times(time_tpl) real_offsets.to_h.each do |name, results| results.each do |result| raise(Errors::InvalidTimeBasedOffsetError) unless result partition = result.partition # Negative offset means we're beyond last message and we need to query for the # high watermark offset to get the most recent offset and move there if result.offset.negative? _, offset = consumer.query_watermark_offsets(name, result.partition) else # If we get an offset, it means there existed a message close to this time # location offset = result.offset end # Since now we have proper offsets, we can add this to the final tpl for commit tpl.to_h[name] ||= [] tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset) tpl.to_h[name].reverse! tpl.to_h[name].uniq!(&:partition) tpl.to_h[name].reverse! end end end consumer.commit_offsets(tpl, async: false) end end |
#trigger_rebalance(consumer_group_id) ⇒ void
This method creates a temporary “fake” consumer that joins the consumer group, triggering a rebalance when it joins and another when it leaves. This should only be used for operational/testing purposes as it causes two rebalances.
The consumer group does not need to be running for this to work, but if it is, it will experience rebalances.
The behavior follows the configured rebalance protocol. For cooperative sticky rebalancing or KIP-848 based protocols, there may be no immediate reaction to the rebalance trigger as these protocols allow incremental partition reassignments without stopping all consumers.
Topics are always detected from the routing configuration. The consumer settings (kafka config) are taken from the first topic in the consumer group to ensure consistency with the actual consumer configuration.
This method returns an undefined value.
Triggers a rebalance for the specified consumer group by briefly joining and leaving
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/karafka/admin/consumer_groups.rb', line 310 def trigger_rebalance(consumer_group_id) consumer_group = Karafka::App.routes.find { |cg| cg.id == consumer_group_id } unless consumer_group raise( Errors::InvalidConfigurationError, "Consumer group '#{consumer_group_id}' not found in routing" ) end topics = consumer_group.topics.map(&:name) if topics.empty? raise( Errors::InvalidConfigurationError, "Consumer group '#{consumer_group_id}' has no topics" ) end # Get the first topic to extract kafka settings first_topic = consumer_group.topics.first # Build consumer settings using the consumer group's kafka config from first topic # This ensures we use the same settings as the actual consumers # Following the same pattern as in Karafka::Connection::Client#build_kafka consumer_settings = Setup::AttributesMap.consumer(first_topic.kafka.dup) consumer_settings[:"group.id"] = consumer_group.id consumer_settings[:"enable.auto.offset.store"] = false consumer_settings[:"auto.offset.reset"] ||= first_topic.initial_offset with_consumer(consumer_settings) do |consumer| # Subscribe to the topics - this triggers the first rebalance consumer.subscribe(*topics) # Wait briefly (100ms) to allow the rebalance to initiate # The actual rebalance happens asynchronously, so we just need to give it a moment sleep(0.1) # Unsubscribe - this will trigger the second rebalance when the consumer closes # The ensure block in with_consumer will handle the unsubscribe and close end end |