Class: Karafka::Admin::ConsumerGroups

Inherits:
Karafka::Admin show all
Defined in:
lib/karafka/admin/consumer_groups.rb

Overview

Consumer group administration operations Provides methods to manage Kafka consumer groups including offset management, migration, and introspection

Constant Summary

Constants inherited from Karafka::Admin

Recovery

Instance Attribute Summary

Attributes inherited from Karafka::Admin

#custom_kafka

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Karafka::Admin

cluster_info, #cluster_info, copy_consumer_group, #copy_consumer_group, create_partitions, #create_partitions, create_topic, #create_topic, delete_consumer_group, #delete_consumer_group, delete_topic, #delete_topic, #initialize, plan_topic_replication, #plan_topic_replication, read_topic, #read_topic, read_watermark_offsets, #read_watermark_offsets, rename_consumer_group, #rename_consumer_group, seek_consumer_group, #seek_consumer_group, topic_info, #topic_info, with_admin, #with_admin, with_consumer, #with_consumer

Constructor Details

This class inherits a constructor from Karafka::Admin

Class Method Details

.copy(previous_name, new_name, topics) ⇒ Object

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to copy offsets

See Also:



32
33
34
# File 'lib/karafka/admin/consumer_groups.rb', line 32

def copy(previous_name, new_name, topics)
  new.copy(previous_name, new_name, topics)
end

.delete(consumer_group_id) ⇒ Object

Parameters:

  • consumer_group_id (String)

    consumer group name

See Also:



47
48
49
# File 'lib/karafka/admin/consumer_groups.rb', line 47

def delete(consumer_group_id)
  new.delete(consumer_group_id)
end

.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Object

Parameters:

  • consumer_groups_with_topics (Hash{String => Array<String>}) (defaults to: {})

    hash with consumer groups names with array of topics

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, will select also inactive topics

See Also:



61
62
63
64
65
66
# File 'lib/karafka/admin/consumer_groups.rb', line 61

def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
  new.read_lags_with_offsets(
    consumer_groups_with_topics,
    active_topics_only: active_topics_only
  )
end

.rename(previous_name, new_name, topics, delete_previous: true) ⇒ Object

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename

See Also:



41
42
43
# File 'lib/karafka/admin/consumer_groups.rb', line 41

def rename(previous_name, new_name, topics, delete_previous: true)
  new.rename(previous_name, new_name, topics, delete_previous: delete_previous)
end

.seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object

Parameters:

  • consumer_group_id (String)

    consumer group for which we want to move offsets

  • topics_with_partitions_and_offsets (Hash)

    hash with topics and settings

See Also:



24
25
26
# File 'lib/karafka/admin/consumer_groups.rb', line 24

def seek(consumer_group_id, topics_with_partitions_and_offsets)
  new.seek(consumer_group_id, topics_with_partitions_and_offsets)
end

.trigger_rebalance(consumer_group_id) ⇒ Object

Parameters:

  • consumer_group_id (String)

    consumer group id to trigger rebalance for

See Also:



53
54
55
# File 'lib/karafka/admin/consumer_groups.rb', line 53

def trigger_rebalance(consumer_group_id)
  new.trigger_rebalance(consumer_group_id)
end

Instance Method Details

#copy(previous_name, new_name, topics) ⇒ Boolean

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

Note:

If new consumer group exists, old offsets will be added to it.

Takes consumer group and its topics and copies all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

Returns:

  • (Boolean)

    true if anything was migrated, otherwise false



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/karafka/admin/consumer_groups.rb', line 213

def copy(previous_name, new_name, topics)
  remap = Hash.new { |h, k| h[k] = {} }

  old_lags = read_lags_with_offsets({ previous_name => topics })

  return false if old_lags.empty?
  return false if old_lags.values.all? { |topic_data| topic_data.values.all?(&:empty?) }

  read_lags_with_offsets({ previous_name => topics })
    .fetch(previous_name)
    .each do |topic, partitions|
      partitions.each do |partition_id, details|
        offset = details[:offset]

        # No offset on this partition
        next if offset.negative?

        remap[topic][partition_id] = offset
      end
    end

  seek(new_name, remap)

  true
end

#delete(consumer_group_id) ⇒ void

Note:

This method should not be used on a running consumer group as it will not yield any results.

This method returns an undefined value.

Removes given consumer group (if exists)

Parameters:

  • consumer_group_id (String)

    consumer group name



276
277
278
279
280
281
# File 'lib/karafka/admin/consumer_groups.rb', line 276

def delete(consumer_group_id)
  with_admin do |admin|
    handler = admin.delete_group(consumer_group_id)
    handler.wait(max_wait_timeout_ms: max_wait_time_ms)
  end
end

#read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}

Note:

For topics that do not exist, topic details will be set to an empty hash

Note:

For topics that exist but were never consumed by a given CG we set ‘-1` as lag and the offset on each of the partitions that were not consumed.

Note:

This lag reporting is for committed lags and is “Kafka-centric”, meaning that this represents lags from Kafka perspective and not the consumer. They may differ.

Reads lags and offsets for given topics in the context of consumer groups defined in the

routing

Parameters:

  • consumer_groups_with_topics (Hash{String => Array<String>}) (defaults to: {})

    hash with consumer groups names with array of topics to query per consumer group inside

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, when we use routing topics, will select also topics that are marked as inactive in routing

Returns:

  • (Hash{String => Hash{Integer => Hash{Integer => Object}}})

    hash where the top level keys are the consumer groups and values are hashes with topics and inside partitions with lags and offsets



372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/karafka/admin/consumer_groups.rb', line 372

def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
  # We first fetch all the topics with partitions count that exist in the cluster so we
  # do not query for topics that do not exist and so we can get partitions count for all
  # the topics we may need. The non-existent and not consumed will be filled at the end
  existing_topics = cluster_info.topics.to_h do |topic|
    [topic[:topic_name], topic[:partition_count]]
  end.freeze

  # If no expected CGs, we use all from routing that have active topics
  if consumer_groups_with_topics.empty?
    consumer_groups_with_topics = Karafka::App.routes.to_h do |cg|
      cg_topics = cg.topics.select do |cg_topic|
        active_topics_only ? cg_topic.active? : true
      end

      [cg.id, cg_topics.map(&:name)]
    end
  end

  # We make a copy because we will remove once with non-existing topics
  # We keep original requested consumer groups with topics for later backfilling
  cgs_with_topics = consumer_groups_with_topics.dup
  cgs_with_topics.transform_values!(&:dup)

  # We can query only topics that do exist, this is why we are cleaning those that do not
  # exist
  cgs_with_topics.each_value do |requested_topics|
    requested_topics.delete_if { |topic| !existing_topics.include?(topic) }
  end

  groups_lags = Hash.new { |h, k| h[k] = {} }
  groups_offs = Hash.new { |h, k| h[k] = {} }

  cgs_with_topics.each do |cg, topics|
    # Do not add to tpl topics that do not exist
    next if topics.empty?

    tpl = Rdkafka::Consumer::TopicPartitionList.new

    with_consumer("group.id": cg) do |consumer|
      topics.each { |topic| tpl.add_topic(topic, existing_topics[topic]) }

      commit_offsets = consumer.committed(tpl)

      commit_offsets.to_h.each do |topic, partitions|
        groups_offs[cg][topic] = {}

        partitions.each do |partition|
          # -1 when no offset is stored
          groups_offs[cg][topic][partition.partition] = partition.offset || -1
        end
      end

      consumer.lag(commit_offsets).each do |topic, partitions_lags|
        groups_lags[cg][topic] = partitions_lags
      end
    end
  end

  consumer_groups_with_topics.each do |cg, topics|
    groups_lags[cg]

    topics.each do |topic|
      groups_lags[cg][topic] ||= {}

      next unless existing_topics.key?(topic)

      # We backfill because there is a case where our consumer group would consume for
      # example only one partition out of 20, rest needs to get -1
      existing_topics[topic].times do |partition_id|
        groups_lags[cg][topic][partition_id] ||= -1
      end
    end
  end

  merged = Hash.new { |h, k| h[k] = {} }

  groups_lags.each do |cg, topics|
    topics.each do |topic, partitions|
      merged[cg][topic] = {}

      partitions.each do |partition, lag|
        merged[cg][topic][partition] = {
          offset: groups_offs.fetch(cg).fetch(topic).fetch(partition),
          lag: lag
        }
      end
    end
  end

  merged
end

#rename(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

Note:

After migration unless ‘delete_previous` is set to `false`, old group will be removed.

Note:

If new consumer group exists, old offsets will be added to it.

Takes consumer group and its topics and migrates all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename. Defaults to true.

Returns:

  • (Boolean)

    true if rename (and optionally removal) was ok or false if there was nothing really to rename



257
258
259
260
261
262
263
264
265
266
# File 'lib/karafka/admin/consumer_groups.rb', line 257

def rename(previous_name, new_name, topics, delete_previous: true)
  copy_result = copy(previous_name, new_name, topics)

  return false unless copy_result
  return copy_result unless delete_previous

  delete(previous_name)

  true
end

#seek(consumer_group_id, topics_with_partitions_and_offsets) ⇒ void

Note:

This method should not be executed on a running consumer group as it creates a “fake” consumer and uses it to move offsets.

This method returns an undefined value.

Moves the offset on a given consumer group and provided topic to the requested location

Examples:

Move a single topic partition nr 1 offset to 100

Karafka::Admin::ConsumerGroups.seek('group-id', { 'topic' => { 1 => 100 } })

Move offsets on all partitions of a topic to 100

Karafka::Admin::ConsumerGroups.seek('group-id', { 'topic' => 100 })

Move offset to 5 seconds ago on partition 2

Karafka::Admin::ConsumerGroups.seek('group-id', { 'topic' => { 2 => 5.seconds.ago } })

Move to the earliest offset on all the partitions of a topic

Karafka::Admin::ConsumerGroups.seek('group-id', { 'topic' => 'earliest' })

Move to the latest (high-watermark) offset on all the partitions of a topic

Karafka::Admin::ConsumerGroups.seek('group-id', { 'topic' => 'latest' })

Move offset of a single partition to earliest

Karafka::Admin::ConsumerGroups.seek('group-id', { 'topic' => { 1 => 'earliest' } })

Move offset of a single partition to latest

Karafka::Admin::ConsumerGroups.seek('group-id', { 'topic' => { 1 => 'latest' } })

Parameters:

  • consumer_group_id (String)

    id of the consumer group for which we want to move the existing offset

  • topics_with_partitions_and_offsets (Hash)

    Hash with list of topics and settings to where to move given consumer. It allows us to move particular partitions or whole topics if we want to reset all partitions to for example a point in time.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/karafka/admin/consumer_groups.rb', line 102

def seek(consumer_group_id, topics_with_partitions_and_offsets)
  tpl_base = {}

  # Normalize the data so we always have all partitions and topics in the same format
  # That is in a format where we have topics and all partitions with their per partition
  # assigned offsets
  topics_with_partitions_and_offsets.each do |topic, partitions_with_offsets|
    tpl_base[topic] = {}

    if partitions_with_offsets.is_a?(Hash)
      tpl_base[topic] = partitions_with_offsets
    else
      topic_info = Topics.new(kafka: @custom_kafka).info(topic)
      topic_info[:partition_count].times do |partition|
        tpl_base[topic][partition] = partitions_with_offsets
      end
    end
  end

  tpl_base.each_value do |partitions|
    partitions.transform_values! do |position|
      # Support both symbol and string based references
      casted_position = position.is_a?(Symbol) ? position.to_s : position

      # This remap allows us to transform some special cases in a reference that can be
      # understood by Kafka
      case casted_position
      # Earliest is not always 0. When compacting/deleting it can be much later, that's why
      # we fetch the oldest possible offset
      # false is treated the same as 'earliest'
      when "earliest", false
        LONG_TIME_AGO
      # Latest will always be the high-watermark offset and we can get it just by getting
      # a future position
      when "latest"
        Time.now + DAY_IN_SECONDS
      # Regular offset case
      else
        position
      end
    end
  end

  tpl = Rdkafka::Consumer::TopicPartitionList.new
  # In case of time based location, we need to to a pre-resolution, that's why we keep it
  # separately
  time_tpl = Rdkafka::Consumer::TopicPartitionList.new

  # Distribute properly the offset type
  tpl_base.each do |topic, partitions_with_offsets|
    partitions_with_offsets.each do |partition, offset|
      target = offset.is_a?(Time) ? time_tpl : tpl
      # We reverse and uniq to make sure that potentially duplicated references are removed
      # in such a way that the newest stays
      target.to_h[topic] ||= []
      target.to_h[topic] << Rdkafka::Consumer::Partition.new(partition, offset)
      target.to_h[topic].reverse!
      target.to_h[topic].uniq!(&:partition)
      target.to_h[topic].reverse!
    end
  end

  settings = { "group.id": consumer_group_id }

  with_consumer(settings) do |consumer|
    # If we have any time based stuff to resolve, we need to do it prior to commits
    unless time_tpl.empty?
      real_offsets = consumer.offsets_for_times(time_tpl)

      real_offsets.to_h.each do |name, results|
        results.each do |result|
          raise(Errors::InvalidTimeBasedOffsetError) unless result

          partition = result.partition

          # Negative offset means we're beyond last message and we need to query for the
          # high watermark offset to get the most recent offset and move there
          if result.offset.negative?
            _, offset = consumer.query_watermark_offsets(name, result.partition)
          else
            # If we get an offset, it means there existed a message close to this time
            # location
            offset = result.offset
          end

          # Since now we have proper offsets, we can add this to the final tpl for commit
          tpl.to_h[name] ||= []
          tpl.to_h[name] << Rdkafka::Consumer::Partition.new(partition, offset)
          tpl.to_h[name].reverse!
          tpl.to_h[name].uniq!(&:partition)
          tpl.to_h[name].reverse!
        end
      end
    end

    consumer.commit_offsets(tpl, async: false)
  end
end

#trigger_rebalance(consumer_group_id) ⇒ void

Note:

This method creates a temporary “fake” consumer that joins the consumer group, triggering a rebalance when it joins and another when it leaves. This should only be used for operational/testing purposes as it causes two rebalances.

Note:

The consumer group does not need to be running for this to work, but if it is, it will experience rebalances.

Note:

The behavior follows the configured rebalance protocol. For cooperative sticky rebalancing or KIP-848 based protocols, there may be no immediate reaction to the rebalance trigger as these protocols allow incremental partition reassignments without stopping all consumers.

Note:

Topics are always detected from the routing configuration. The consumer settings (kafka config) are taken from the first topic in the consumer group to ensure consistency with the actual consumer configuration.

This method returns an undefined value.

Triggers a rebalance for the specified consumer group by briefly joining and leaving

Examples:

Trigger rebalance for a consumer group

Karafka::Admin::ConsumerGroups.trigger_rebalance('my-group')

Parameters:

  • consumer_group_id (String)

    consumer group id to trigger rebalance for

Raises:



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/karafka/admin/consumer_groups.rb', line 310

def trigger_rebalance(consumer_group_id)
  consumer_group = Karafka::App.routes.find { |cg| cg.id == consumer_group_id }

  unless consumer_group
    raise(
      Errors::InvalidConfigurationError,
      "Consumer group '#{consumer_group_id}' not found in routing"
    )
  end

  topics = consumer_group.topics.map(&:name)

  if topics.empty?
    raise(
      Errors::InvalidConfigurationError,
      "Consumer group '#{consumer_group_id}' has no topics"
    )
  end

  # Get the first topic to extract kafka settings
  first_topic = consumer_group.topics.first

  # Build consumer settings using the consumer group's kafka config from first topic
  # This ensures we use the same settings as the actual consumers
  # Following the same pattern as in Karafka::Connection::Client#build_kafka
  consumer_settings = Setup::AttributesMap.consumer(first_topic.kafka.dup)
  consumer_settings[:"group.id"] = consumer_group.id
  consumer_settings[:"enable.auto.offset.store"] = false
  consumer_settings[:"auto.offset.reset"] ||= first_topic.initial_offset

  with_consumer(consumer_settings) do |consumer|
    # Subscribe to the topics - this triggers the first rebalance
    consumer.subscribe(*topics)

    # Wait briefly (100ms) to allow the rebalance to initiate
    # The actual rebalance happens asynchronously, so we just need to give it a moment
    sleep(0.1)

    # Unsubscribe - this will trigger the second rebalance when the consumer closes
    # The ensure block in with_consumer will handle the unsubscribe and close
  end
end