Class: Karafka::Pro::Admin::Recovery

Inherits:
Admin
  • Object
show all
Defined in:
lib/karafka/pro/admin/recovery.rb,
lib/karafka/pro/admin/recovery/errors.rb

Overview

Note:

These methods should NOT be used unless you are experiencing issues that require manual intervention. Misuse can lead to data loss or other problems.

Consumer group recovery toolkit.

Provides coordinator-bypass offset reading and blast-radius assessment for scenarios where the Kafka group coordinator is in a FAILED state and normal admin APIs return NOT_COORDINATOR or time out.

Works for any coordinator failure scenario:

- KAFKA-19862 (compaction race during coordinator load)
- Broker OOM / GC pause making coordinator unreachable
- Network partition isolating the coordinator broker
- Any future bug that transitions a coordinator shard to FAILED

Each consumer group is assigned to a specific __consumer_offsets partition (and therefore a specific coordinator broker) based on its name. When that coordinator enters a FAILED state, all operations for the group - joins, heartbeats, offset commits, and offset fetches - are stuck until the coordinator recovers.

A common recovery strategy is migrating to a new consumer group with a different name, which causes Kafka to hash it to a (likely) different __consumer_offsets partition served by a healthy coordinator. This class provides the tools to:

1. Read committed offsets directly from the raw __consumer_offsets log (bypassing the
   broken coordinator) via {#read_committed_offsets}
2. Assess blast radius: which broker coordinates a group ({#coordinator_for}), which
   partitions a broker leads ({#affected_partitions}), and which groups are affected
   ({#affected_groups})

To complete the migration, use Admin::ConsumerGroups.seek to write the recovered offsets to the new group.

All reads go through the fetch API and never touch the group coordinator.

Defined Under Namespace

Modules: Errors

Constant Summary

Constants inherited from Admin

Admin::Recovery

Instance Attribute Summary

Attributes inherited from Admin

#custom_kafka

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Admin

cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_partitions, #create_partitions, create_topic, #create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, plan_topic_replication, #plan_topic_replication, read_lags_with_offsets, #read_lags_with_offsets, read_topic, #read_topic, read_watermark_offsets, #read_watermark_offsets, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, trigger_rebalance, #trigger_rebalance, with_admin, #with_admin, with_consumer, #with_consumer

Constructor Details

This class inherits a constructor from Karafka::Admin

Class Method Details

.affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Array<String>

Returns sorted consumer group names.

Parameters:

  • partition (Integer)

    __consumer_offsets partition to scan

  • last_committed_at (Time) (defaults to: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET)

    approximate time of last successful offset commit (default: 1 hour ago). A good rule of thumb is the crash time minus 10 minutes

Returns:

  • (Array<String>)

    sorted consumer group names

See Also:



111
112
113
# File 'lib/karafka/pro/admin/recovery.rb', line 111

def affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET)
  new.affected_groups(partition, last_committed_at: last_committed_at)
end

.affected_partitions(broker_id) ⇒ Array<Integer>

Returns sorted partition numbers.

Parameters:

  • broker_id (Integer)

    broker node id

Returns:

  • (Array<Integer>)

    sorted partition numbers

See Also:



118
119
120
# File 'lib/karafka/pro/admin/recovery.rb', line 118

def affected_partitions(broker_id)
  new.affected_partitions(broker_id)
end

.coordinator_for(consumer_group_id) ⇒ Hash

Returns coordinator broker info.

Parameters:

  • consumer_group_id (String)

    consumer group to look up

Returns:

  • (Hash)

    coordinator broker info

See Also:



102
103
104
# File 'lib/karafka/pro/admin/recovery.rb', line 102

def coordinator_for(consumer_group_id)
  new.coordinator_for(consumer_group_id)
end

.offsets_partition_for(consumer_group_id) ⇒ Integer

Returns __consumer_offsets partition number.

Parameters:

  • consumer_group_id (String)

    consumer group id

Returns:

  • (Integer)

    __consumer_offsets partition number

See Also:



95
96
97
# File 'lib/karafka/pro/admin/recovery.rb', line 95

def offsets_partition_for(consumer_group_id)
  new.offsets_partition_for(consumer_group_id)
end

.read_committed_offsets(consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Hash{String => Hash{Integer => Integer}}

Parameters:

  • consumer_group_id (String)

    consumer group to read offsets for

  • last_committed_at (Time) (defaults to: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET)

    approximate time of last successful offset commit (default: 1 hour ago). A good rule of thumb is the crash time minus 10 minutes

Returns:

  • (Hash{String => Hash{Integer => Integer}})

See Also:



85
86
87
88
89
90
# File 'lib/karafka/pro/admin/recovery.rb', line 85

def read_committed_offsets(
  consumer_group_id,
  last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET
)
  new.read_committed_offsets(consumer_group_id, last_committed_at: last_committed_at)
end

Instance Method Details

#affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Array<String>

Scans a __consumer_offsets partition and returns consumer group names that have active committed offsets. Groups where all offsets have been tombstoned (deleted) within the scan window are excluded.

Use this to discover which consumer groups are affected when a coordinator broker fails. Combined with #affected_partitions, this gives the full blast radius of a broker outage: first find which __consumer_offsets partitions the failed broker leads, then scan each partition to discover all affected consumer groups.

Examples:

Find all groups on partition 17

Karafka::Admin::Recovery.affected_groups(17)
#=> ["group-a", "group-b", "group-c"]

Full blast radius of a broker outage

partitions = Karafka::Admin::Recovery.affected_partitions(failed_broker_id)
all_affected = partitions.flat_map do |p|
  Karafka::Admin::Recovery.affected_groups(p)
end.uniq

Parameters:

  • partition (Integer)

    __consumer_offsets partition to scan

  • last_committed_at (Time) (defaults to: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET)

    approximate time of last successful offset commit (default: 1 hour ago). A good rule of thumb is the crash time minus 10 minutes

Returns:

  • (Array<String>)

    sorted list of consumer group names with active offsets



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/karafka/pro/admin/recovery.rb', line 325

def affected_groups(partition, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET)
  count = offsets_partition_count

  unless partition >= 0 && partition < count
    raise(
      Errors::PartitionOutOfRangeError,
      "Partition #{partition} is out of range (0...#{count})"
    )
  end

  # Track offsets per group with last-write-wins so fully tombstoned groups
  # (all offsets deleted) are excluded from the result
  committed = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } }

  iterator = Pro::Iterator.new(
    { OFFSETS_TOPIC => { partition => last_committed_at } },
    settings: @custom_kafka
  )

  iterator.each do |message|
    next unless message.raw_key

    parsed = parse_offset_commit(message)
    next unless parsed

    group = parsed[:group]

    if parsed[:offset].nil?
      committed[group][parsed[:topic]].delete(parsed[:partition])
      committed[group].delete(parsed[:topic]) if committed[group][parsed[:topic]].empty?
    else
      committed[group][parsed[:topic]][parsed[:partition]] = parsed[:offset]
    end
  end

  committed.select { |_, topics| !topics.empty? }.keys.sort
end

#affected_partitions(broker_id) ⇒ Array<Integer>

Returns all __consumer_offsets partitions led by a given broker. Pure metadata lookup that does not scan any topic data.

Use this as the first step in assessing the blast radius of a broker outage. The returned partition numbers can be passed to #affected_groups to discover all consumer groups that need recovery or migration.

Examples:

Find partitions led by broker 2

Karafka::Admin::Recovery.affected_partitions(2)
#=> [3, 17, 28, 42]

Parameters:

  • broker_id (Integer)

    broker node id

Returns:

  • (Array<Integer>)

    sorted list of __consumer_offsets partition numbers



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/karafka/pro/admin/recovery.rb', line 376

def affected_partitions(broker_id)
   = cluster_info

  offsets_topic = .topics.find { |t| t[:topic_name] == OFFSETS_TOPIC }

  unless offsets_topic
    raise(
      Errors::MetadataError,
      "Could not retrieve metadata for '#{OFFSETS_TOPIC}'"
    )
  end

  offsets_topic[:partitions]
    .select { |p| p[:leader] == broker_id }
    .map { |p| p[:partition_id] }
    .sort
end

#coordinator_for(consumer_group_id) ⇒ Hash{Symbol => Object}

Returns which broker is the coordinator for a consumer group. The coordinator is the leader of the __consumer_offsets partition assigned to this group. Pure metadata lookup that does not scan any topic data.

Use this to quickly identify which broker is responsible for a consumer group. During an incident, this tells you whether a specific group is affected by a broker outage. If the returned broker is the one that is down or in a FAILED state, the group is stuck and needs migration.

Examples:

Find coordinator for a group

Karafka::Admin::Recovery.coordinator_for('my-group')
#=> { partition: 17, broker_id: 2, broker_host: "broker2:9092" }

Check if a group is affected by a broker outage

info = Karafka::Admin::Recovery.coordinator_for('my-group')
if info[:broker_id] == failed_broker_id
  puts "Group 'my-group' is stuck on failed broker #{info[:broker_host]}"
end

Parameters:

  • consumer_group_id (String)

    consumer group to look up

Returns:

  • (Hash{Symbol => Object})

    coordinator info with :partition, :broker_id, and :broker_host keys



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/karafka/pro/admin/recovery.rb', line 249

def coordinator_for(consumer_group_id)
  target_partition = offsets_partition_for(consumer_group_id)
   = cluster_info

  offsets_topic = .topics.find { |t| t[:topic_name] == OFFSETS_TOPIC }

  unless offsets_topic
    raise(
      Errors::MetadataError,
      "Could not retrieve metadata for '#{OFFSETS_TOPIC}'"
    )
  end

  partitions = offsets_topic[:partitions]
  partition_info = partitions.find { |p| p[:partition_id] == target_partition }

  unless partition_info
    raise(
      Errors::MetadataError,
      "Could not find partition #{target_partition} in '#{OFFSETS_TOPIC}'"
    )
  end

  leader_id = partition_info[:leader]

  broker = .brokers.find do |b|
    if b.is_a?(Hash)
      (b[:broker_id] || b[:node_id]) == leader_id
    else
      b.node_id == leader_id
    end
  end

  unless broker
    raise(
      Errors::MetadataError,
      "Could not find broker #{leader_id} in cluster metadata"
    )
  end

  if broker.is_a?(Hash)
    host = broker[:broker_name] || broker[:host]
    port = broker[:broker_port] || broker[:port]
    broker_host = "#{host}:#{port}"
    broker_id = broker[:broker_id] || broker[:node_id]
  else
    broker_host = "#{broker.host}:#{broker.port}"
    broker_id = broker.node_id
  end

  { partition: target_partition, broker_id: broker_id, broker_host: broker_host }
end

#offsets_partition_for(consumer_group_id) ⇒ Integer

Determines which __consumer_offsets partition holds data for a given consumer group. Kafka uses Utils.abs(String#hashCode) % numPartitions where hashCode is Java’s 32-bit signed hash: s*31^(n-1) + s*31^(n-2) + … + s, computed with int32 overflow semantics. Utils.abs maps Integer.MIN_VALUE to 0.

Examples:

Check which partition stores offsets for a group

Karafka::Admin::Recovery.offsets_partition_for('my-group')
#=> 17

Parameters:

  • consumer_group_id (String)

    consumer group id

Returns:

  • (Integer)

    __consumer_offsets partition number



220
221
222
223
224
225
# File 'lib/karafka/pro/admin/recovery.rb', line 220

def offsets_partition_for(consumer_group_id)
  h = java_hash_code(consumer_group_id)
  # Kafka's Utils.abs: Integer.MIN_VALUE maps to 0
  h = (h == -2_147_483_648) ? 0 : h.abs
  h % offsets_partition_count
end

#read_committed_offsets(consumer_group_id, last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET) ⇒ Hash{String => Hash{Integer => Integer}}

Note:

All consumers in this group should be fully stopped before calling this method. While normally they would already be stopped due to a coordinator failure, if the cluster recovers concurrently, active consumers may commit newer offsets that this scan will not capture, resulting in stale data.

Note:

This method may take a noticeable amount of time to complete because it scans the raw __consumer_offsets log from last_committed_at forward to the end. The duration depends on the volume of offset commits in the scan window across all consumer groups that hash to the same __consumer_offsets partition.

Note:

The result only contains topic-partitions that had offsets committed after last_committed_at. If a partition never had an offset committed, or if the commit happened before last_committed_at, it will be absent from the result. It is the caller’s responsibility to verify that all expected topic-partitions are present before using the result for migration or other operations.

Reads committed offsets for a consumer group directly from the __consumer_offsets internal topic, bypassing the group coordinator. Only scans the single __consumer_offsets partition that holds data for the given group (determined by Java’s String#hashCode mod partition count), starting from last_committed_at and reading forward to EOF. Later records overwrite earlier ones so the result always reflects the most recent committed offset per partition.

Examples:

Read offsets for the last hour (default)

Karafka::Admin::Recovery.read_committed_offsets('sync')
#=> { 'events' => { 0 => 1400, 1 => 1402, ... } }

Read offsets for the last 6 hours

Karafka::Admin::Recovery.read_committed_offsets(
  'sync', last_committed_at: Time.now - 6 * 3600
)

Read offsets from a specific point in time

Karafka::Admin::Recovery.read_committed_offsets('sync', last_committed_at: Time.new(2025, 3, 1))

Migrate a stuck consumer group to a new name (two-step workflow)

# Step 1: Read committed offsets from the broken group (bypasses coordinator)
offsets = Karafka::Admin::Recovery.read_committed_offsets('sync')
#=> { 'events' => { 0 => 1400, 1 => 1402 }, 'orders' => { 0 => 890 } }

# Step 2: Inspect the recovered offsets — verify all expected topics and partitions
# are present and the offset values look reasonable before committing them

# Step 3: Write the offsets to the target group using standard Admin APIs
Karafka::Admin::ConsumerGroups.seek('sync_v2', offsets)

# Now reconfigure your consumers to use 'sync_v2' and restart them

Parameters:

  • consumer_group_id (String)

    consumer group to read offsets for

  • last_committed_at (Time) (defaults to: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET)

    approximate time of last successful offset commit (default: 1 hour ago). A good rule of thumb is the crash time minus 10 minutes

Returns:

  • (Hash{String => Hash{Integer => Integer}})

    { topic => { partition => committed_offset } }



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/karafka/pro/admin/recovery.rb', line 176

def read_committed_offsets(
  consumer_group_id,
  last_committed_at: Time.now - DEFAULT_LAST_COMMITTED_AT_OFFSET
)
  committed = Hash.new { |h, k| h[k] = {} }
  target_partition = offsets_partition_for(consumer_group_id)

  iterator = Pro::Iterator.new(
    { OFFSETS_TOPIC => { target_partition => last_committed_at } },
    settings: @custom_kafka
  )

  iterator.each do |message|
    next unless message.raw_key

    parsed = parse_offset_commit(message)
    next unless parsed
    next unless parsed[:group] == consumer_group_id

    if parsed[:offset].nil?
      # Tombstone — offset was deleted, remove from results
      committed[parsed[:topic]].delete(parsed[:partition])
      committed.delete(parsed[:topic]) if committed[parsed[:topic]].empty?
    else
      # Last write wins — scanning forward means we naturally end up with the most
      # recent commit per partition
      committed[parsed[:topic]][parsed[:partition]] = parsed[:offset]
    end
  end

  committed.sort.to_h.transform_values { |parts| parts.sort.to_h }
end