Class: Karafka::Pro::Admin::Recovery
- Defined in:
- lib/karafka/pro/admin/recovery.rb,
lib/karafka/pro/admin/recovery/errors.rb
Overview
These methods should NOT be used unless you are experiencing issues that require manual intervention. Misuse can lead to data loss or other problems.
Consumer group recovery toolkit.
Provides coordinator-bypass offset reading and blast-radius assessment for scenarios where the Kafka group coordinator is in a FAILED state and normal admin APIs return NOT_COORDINATOR or time out.
Works for any coordinator failure scenario:
- KAFKA-19862 (compaction race during coordinator load)
- Broker OOM / GC pause making coordinator unreachable
- Network partition isolating the coordinator broker
- Any future bug that transitions a coordinator shard to FAILED
Each consumer group is assigned to a specific __consumer_offsets partition (and therefore a specific coordinator broker) based on its name. When that coordinator enters a FAILED state, all operations for the group - joins, heartbeats, offset commits, and offset fetches - are stuck until the coordinator recovers.
A common recovery strategy is migrating to a new consumer group with a different name, which causes Kafka to hash it to a (likely) different __consumer_offsets partition served by a healthy coordinator. This class provides the tools to:
1. Read committed offsets directly from the raw __consumer_offsets log (bypassing the
broken coordinator) via {#read_committed_offsets}
2. Assess blast radius: which broker coordinates a group ({#coordinator_for}), which
partitions a broker leads ({#affected_partitions}), and which groups are affected
({#affected_groups})
To complete the migration, use Admin::ConsumerGroups.seek to write the recovered offsets to the new group.
All reads go through the fetch API and never touch the group coordinator.
Defined Under Namespace
Modules: Errors
Constant Summary
Constants inherited from Admin
Instance Attribute Summary
Attributes inherited from Admin
Class Method Summary collapse
-
.affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Array<String>
Sorted consumer group names.
-
.affected_partitions(broker_id) ⇒ Array<Integer>
Sorted partition numbers.
-
.coordinator_for(consumer_group_id) ⇒ Hash
Coordinator broker info.
-
.offsets_partition_for(consumer_group_id) ⇒ Integer
__consumer_offsets partition number.
- .read_committed_offsets(consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Hash{String => Hash{Integer => Integer}}
Instance Method Summary collapse
-
#affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Array<String>
Scans a __consumer_offsets partition and returns consumer group names that have active committed offsets.
-
#affected_partitions(broker_id) ⇒ Array<Integer>
Returns all __consumer_offsets partitions led by a given broker.
-
#coordinator_for(consumer_group_id) ⇒ Hash{Symbol => Object}
Returns which broker is the coordinator for a consumer group.
-
#offsets_partition_for(consumer_group_id) ⇒ Integer
Determines which __consumer_offsets partition holds data for a given consumer group.
-
#read_committed_offsets(consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Hash{String => Hash{Integer => Integer}}
Reads committed offsets for a consumer group directly from the __consumer_offsets internal topic, bypassing the group coordinator.
Methods inherited from Admin
cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_partitions, #create_partitions, create_topic, #create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, plan_topic_replication, #plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, read_watermark_offsets, #read_watermark_offsets, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, trigger_rebalance, #trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer
Constructor Details
This class inherits a constructor from Karafka::Admin
Class Method Details
.affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Array<String>
Returns sorted consumer group names.
111 112 113 |
# File 'lib/karafka/pro/admin/recovery.rb', line 111 def affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) new.affected_groups(partition, last_committed_at: last_committed_at) end |
.affected_partitions(broker_id) ⇒ Array<Integer>
Returns sorted partition numbers.
118 119 120 |
# File 'lib/karafka/pro/admin/recovery.rb', line 118 def affected_partitions(broker_id) new.affected_partitions(broker_id) end |
.coordinator_for(consumer_group_id) ⇒ Hash
Returns coordinator broker info.
102 103 104 |
# File 'lib/karafka/pro/admin/recovery.rb', line 102 def coordinator_for(consumer_group_id) new.coordinator_for(consumer_group_id) end |
.offsets_partition_for(consumer_group_id) ⇒ Integer
Returns __consumer_offsets partition number.
95 96 97 |
# File 'lib/karafka/pro/admin/recovery.rb', line 95 def offsets_partition_for(consumer_group_id) new.offsets_partition_for(consumer_group_id) end |
.read_committed_offsets(consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Hash{String => Hash{Integer => Integer}}
85 86 87 88 89 90 |
# File 'lib/karafka/pro/admin/recovery.rb', line 85 def read_committed_offsets( consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET ) new.read_committed_offsets(consumer_group_id, last_committed_at: last_committed_at) end |
Instance Method Details
#affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Array<String>
Scans a __consumer_offsets partition and returns consumer group names that have active committed offsets. Groups where all offsets have been tombstoned (deleted) within the scan window are excluded.
Use this to discover which consumer groups are affected when a coordinator broker fails. Combined with #affected_partitions, this gives the full blast radius of a broker outage: first find which __consumer_offsets partitions the failed broker leads, then scan each partition to discover all affected consumer groups.
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/karafka/pro/admin/recovery.rb', line 325 def affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) count = offsets_partition_count unless partition >= 0 && partition < count raise( Errors::PartitionOutOfRangeError, "Partition #{partition} is out of range (0...#{count})" ) end # Track offsets per group with last-write-wins so fully tombstoned groups # (all offsets deleted) are excluded from the result committed = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } } iterator = Pro::Iterator.new( { OFFSETS_TOPIC => { partition => last_committed_at } }, settings: @custom_kafka ) iterator.each do || next unless .raw_key parsed = parse_offset_commit() next unless parsed group = parsed[:group] if parsed[:offset].nil? committed[group][parsed[:topic]].delete(parsed[:partition]) committed[group].delete(parsed[:topic]) if committed[group][parsed[:topic]].empty? else committed[group][parsed[:topic]][parsed[:partition]] = parsed[:offset] end end committed.select { |_, topics| !topics.empty? }.keys.sort end |
#affected_partitions(broker_id) ⇒ Array<Integer>
Returns all __consumer_offsets partitions led by a given broker. Pure metadata lookup that does not scan any topic data.
Use this as the first step in assessing the blast radius of a broker outage. The returned partition numbers can be passed to #affected_groups to discover all consumer groups that need recovery or migration.
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
# File 'lib/karafka/pro/admin/recovery.rb', line 376 def affected_partitions(broker_id) = cluster_info offsets_topic = .topics.find { |t| t[:topic_name] == OFFSETS_TOPIC } unless offsets_topic raise( Errors::MetadataError, "Could not retrieve metadata for '#{OFFSETS_TOPIC}'" ) end offsets_topic[:partitions] .select { |p| p[:leader] == broker_id } .map { |p| p[:partition_id] } .sort end |
#coordinator_for(consumer_group_id) ⇒ Hash{Symbol => Object}
Returns which broker is the coordinator for a consumer group. The coordinator is the leader of the __consumer_offsets partition assigned to this group. Pure metadata lookup that does not scan any topic data.
Use this to quickly identify which broker is responsible for a consumer group. During an incident, this tells you whether a specific group is affected by a broker outage. If the returned broker is the one that is down or in a FAILED state, the group is stuck and needs migration.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/karafka/pro/admin/recovery.rb', line 249 def coordinator_for(consumer_group_id) target_partition = offsets_partition_for(consumer_group_id) = cluster_info offsets_topic = .topics.find { |t| t[:topic_name] == OFFSETS_TOPIC } unless offsets_topic raise( Errors::MetadataError, "Could not retrieve metadata for '#{OFFSETS_TOPIC}'" ) end partitions = offsets_topic[:partitions] partition_info = partitions.find { |p| p[:partition_id] == target_partition } unless partition_info raise( Errors::MetadataError, "Could not find partition #{target_partition} in '#{OFFSETS_TOPIC}'" ) end leader_id = partition_info[:leader] broker = .brokers.find do |b| if b.is_a?(Hash) (b[:broker_id] || b[:node_id]) == leader_id else b.node_id == leader_id end end unless broker raise( Errors::MetadataError, "Could not find broker #{leader_id} in cluster metadata" ) end if broker.is_a?(Hash) host = broker[:broker_name] || broker[:host] port = broker[:broker_port] || broker[:port] broker_host = "#{host}:#{port}" broker_id = broker[:broker_id] || broker[:node_id] else broker_host = "#{broker.host}:#{broker.port}" broker_id = broker.node_id end { partition: target_partition, broker_id: broker_id, broker_host: broker_host } end |
#offsets_partition_for(consumer_group_id) ⇒ Integer
220 221 222 223 224 225 |
# File 'lib/karafka/pro/admin/recovery.rb', line 220 def offsets_partition_for(consumer_group_id) h = java_hash_code(consumer_group_id) # Kafka's Utils.abs: Integer.MIN_VALUE maps to 0 h = (h == -2_147_483_648) ? 0 : h.abs h % offsets_partition_count end |
#read_committed_offsets(consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Hash{String => Hash{Integer => Integer}}
All consumers in this group should be fully stopped before calling this method. While normally they would already be stopped due to a coordinator failure, if the cluster recovers concurrently, active consumers may commit newer offsets that this scan will not capture, resulting in stale data.
This method may take a noticeable amount of time to complete because it scans the raw __consumer_offsets log from last_committed_at forward to the end. The duration depends on the volume of offset commits in the scan window across all consumer groups that hash to the same __consumer_offsets partition.
The result only contains topic-partitions that had offsets committed after last_committed_at. If a partition never had an offset committed, or if the commit happened before last_committed_at, it will be absent from the result. It is the caller’s responsibility to verify that all expected topic-partitions are present before using the result for migration or other operations.
Reads committed offsets for a consumer group directly from the __consumer_offsets internal topic, bypassing the group coordinator. Only scans the single __consumer_offsets partition that holds data for the given group (determined by Java’s String#hashCode mod partition count), starting from last_committed_at and reading forward to EOF. Later records overwrite earlier ones so the result always reflects the most recent committed offset per partition.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/karafka/pro/admin/recovery.rb', line 176 def read_committed_offsets( consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET ) committed = Hash.new { |h, k| h[k] = {} } target_partition = offsets_partition_for(consumer_group_id) iterator = Pro::Iterator.new( { OFFSETS_TOPIC => { target_partition => last_committed_at } }, settings: @custom_kafka ) iterator.each do || next unless .raw_key parsed = parse_offset_commit() next unless parsed next unless parsed[:group] == consumer_group_id if parsed[:offset].nil? # Tombstone — offset was deleted, remove from results committed[parsed[:topic]].delete(parsed[:partition]) committed.delete(parsed[:topic]) if committed[parsed[:topic]].empty? else # Last write wins — scanning forward means we naturally end up with the most # recent commit per partition committed[parsed[:topic]][parsed[:partition]] = parsed[:offset] end end committed.sort.to_h.transform_values { |parts| parts.sort.to_h } end |