Class: Karafka::Admin

Inherits:
Object
  • Object
show all
Extended by:
Core::Helpers::Time
Defined in:
lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/topics.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/replication.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/consumer_groups.rb,
lib/karafka/admin/configs/resource.rb,
lib/karafka/admin/isolation_levels.rb,
lib/karafka/admin/contracts/replication.rb

Overview

Note:

It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.

Note:

By default it uses the primary defined cluster. For multi-cluster operations, create an Admin instance with custom kafka configuration: ‘Karafka::Admin.new(kafka: { ’bootstrap.servers’: ‘other:9092’ })‘

Admin actions that we can perform via Karafka on our Kafka cluster

Defined Under Namespace

Modules: Contracts, IsolationLevels Classes: Acl, Configs, ConsumerGroups, Replication, Topics

Constant Summary collapse

Recovery =

We alias this for Pro users so we don’t end up having two Admin namespaces from the end user perspective. This enhances the UX.

Karafka::Pro::Admin::Recovery

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kafka: {}) ⇒ Admin

Creates a new Admin instance

Examples:

Create admin for a different cluster

admin = Karafka::Admin.new(kafka: { 'bootstrap.servers': 'other-cluster:9092' })
admin.cluster_info

Parameters:

  • kafka (Hash) (defaults to: {})

    custom kafka configuration to merge with app defaults. Useful for multi-cluster operations where you want to target a different cluster.



37
38
39
# File 'lib/karafka/admin.rb', line 37

def initialize(kafka: {})
  @custom_kafka = kafka
end

Instance Attribute Details

#custom_kafkaHash (readonly)

Custom kafka configuration for this admin instance

Returns:

  • (Hash)

    custom kafka settings to merge with defaults



27
28
29
# File 'lib/karafka/admin.rb', line 27

def custom_kafka
  @custom_kafka
end

Class Method Details

.cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



230
231
232
# File 'lib/karafka/admin.rb', line 230

def cluster_info
  new.cluster_info
end

.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean

Takes consumer group and its topics and copies all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

Returns:

  • (Boolean)

    true if anything was migrated, otherwise false

See Also:



126
127
128
# File 'lib/karafka/admin.rb', line 126

def copy_consumer_group(previous_name, new_name, topics)
  new.copy_consumer_group(previous_name, new_name, topics)
end

.create_partitions(name, partitions) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with

See Also:



87
88
89
# File 'lib/karafka/admin.rb', line 87

def create_partitions(name, partitions)
  new.create_partitions(name, partitions)
end

.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details

See Also:



74
75
76
# File 'lib/karafka/admin.rb', line 74

def create_topic(name, partitions, replication_factor, topic_config = {})
  new.create_topic(name, partitions, replication_factor, topic_config)
end

.delete_consumer_group(group_id) ⇒ Object

Removes given group (if exists)

Parameters:

  • group_id (String)

    group name

See Also:



153
154
155
# File 'lib/karafka/admin.rb', line 153

def delete_consumer_group(group_id)
  new.delete_consumer_group(group_id)
end

.delete_topic(name) ⇒ Object

Parameters:

  • name (String)

    topic name

See Also:



80
81
82
# File 'lib/karafka/admin.rb', line 80

def delete_topic(name)
  new.delete_topic(name)
end

.plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Replication

Plans a replication factor increase for a topic that can be used with Kafka’s reassignment tools. Since librdkafka does not support increasing replication factor directly, this method generates the necessary JSON and commands for manual execution.

Examples:

Plan replication increase with automatic broker distribution

plan = Karafka::Admin.plan_topic_replication(topic: 'events', replication_factor: 3)

# Review the plan
puts plan.summary

# Export JSON for Kafka's reassignment tools
plan.export_to_file('reassignment.json')

# Execute the plan (replace <KAFKA_BROKERS> with actual brokers)
system(plan.execution_commands[:execute].gsub('<KAFKA_BROKERS>', 'localhost:9092'))

Plan replication with manual broker placement - specify brokers

plan = Karafka::Admin.plan_topic_replication(
  topic: 'events',
  replication_factor: 3,
  brokers: {
    0 => [1, 2, 4],  # Partition 0 on brokers 1, 2, 4
    1 => [2, 3, 4],  # Partition 1 on brokers 2, 3, 4
    2 => [1, 3, 5]   # Partition 2 on brokers 1, 3, 5
  }
)

# The plan will use your exact broker specifications
puts plan.partitions_assignment
# => { 0=>[1, 2, 4], 1=>[2, 3, 4], 2=>[1, 3, 5] }

Parameters:

  • topic (String)

    name of the topic to plan replication for

  • replication_factor (Integer)

    target replication factor (must be higher than current)

  • brokers (Hash{Integer => Array<Integer>}) (defaults to: nil)

    optional manual broker assignments per partition. Keys are partition IDs, values are arrays of broker IDs. If not provided, assignments distribution will happen automatically.

Returns:

  • (Replication)

    plan object with JSON, commands, and instructions

See Also:



221
222
223
224
225
226
227
# File 'lib/karafka/admin.rb', line 221

def plan_topic_replication(topic:, replication_factor:, brokers: nil)
  new.plan_topic_replication(
    topic: topic,
    replication_factor: replication_factor,
    brokers: brokers
  )
end

.read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}

Reads lags and offsets for given topics in the context of groups defined in the routing

Parameters:

  • groups_with_topics (Hash{String => Array<String>}) (defaults to: {})

    hash with group names with array of topics to query per group inside

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, when we use routing topics, will select also topics that are marked as inactive in routing

Returns:

  • (Hash{String => Hash{Integer => Hash{Integer => Object}}})

    hash where the top level keys are the groups and values are hashes with topics and inside partitions with lags and offsets

See Also:



175
176
177
178
179
180
# File 'lib/karafka/admin.rb', line 175

def read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true)
  new.read_lags_with_offsets(
    groups_with_topics,
    active_topics_only: active_topics_only
  )
end

.read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object

Parameters:

  • topic_partition_offsets (Hash{String => Array<Hash>})

    topics with partition specs

  • isolation_level (Integer, nil) (defaults to: nil)

    optional isolation level constant

See Also:



101
102
103
# File 'lib/karafka/admin.rb', line 101

def read_partition_offsets(topic_partition_offsets, isolation_level: nil)
  new.read_partition_offsets(topic_partition_offsets, isolation_level: isolation_level)
end

.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start

  • settings (Hash) (defaults to: {})

    kafka extra settings

See Also:



65
66
67
# File 'lib/karafka/admin.rb', line 65

def read_topic(name, partition, count, start_offset = -1, settings = {})
  new.read_topic(name, partition, count, start_offset, settings)
end

.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition (nil when using hash format)

See Also:



94
95
96
# File 'lib/karafka/admin.rb', line 94

def read_watermark_offsets(name_or_hash, partition = nil)
  new.read_watermark_offsets(name_or_hash, partition)
end

.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean

Takes consumer group and its topics and migrates all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename. Defaults to true.

Returns:

  • (Boolean)

    true if rename (and optionally removal) was ok or false if there was nothing really to rename

See Also:



140
141
142
143
144
145
146
147
# File 'lib/karafka/admin.rb', line 140

def rename_consumer_group(previous_name, new_name, topics, delete_previous: true)
  new.rename_consumer_group(
    previous_name,
    new_name,
    topics,
    delete_previous: delete_previous
  )
end

.seek_consumer_group(group_id, topics_with_partitions_and_offsets) ⇒ Object

Parameters:

  • group_id (String)

    id of the group for which we want to move the existing offset

  • topics_with_partitions_and_offsets (Hash)

    Hash with list of topics and settings

See Also:



115
116
117
# File 'lib/karafka/admin.rb', line 115

def seek_consumer_group(group_id, topics_with_partitions_and_offsets)
  new.seek_consumer_group(group_id, topics_with_partitions_and_offsets)
end

.topic_info(topic_name) ⇒ Object

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

See Also:



107
108
109
# File 'lib/karafka/admin.rb', line 107

def topic_info(topic_name)
  new.topic_info(topic_name)
end

.trigger_rebalance(group_id) ⇒ Object

Note:

This API should be used only for development.

Triggers a rebalance for the specified group

Parameters:

  • group_id (String)

    group id to trigger rebalance for

See Also:



162
163
164
# File 'lib/karafka/admin.rb', line 162

def trigger_rebalance(group_id)
  new.trigger_rebalance(group_id)
end

.with_adminObject

Creates admin instance and yields it. After usage it closes the admin instance



246
247
248
# File 'lib/karafka/admin.rb', line 246

def with_admin(&)
  new.with_admin(&)
end

.with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



241
242
243
# File 'lib/karafka/admin.rb', line 241

def with_consumer(settings = {}, &)
  new.with_consumer(settings, &)
end

Instance Method Details

#closeObject

No-op close to normalize the API surface.

Each admin operation currently opens and closes its own underlying rdkafka admin instance internally, so there is nothing to release at the ‘Karafka::Admin` level right now. This method exists so that callers who hold an instance and call `#close` on it (matching the pattern of other closeable resources) do not raise `NoMethodError`.

In the future, ‘Karafka::Admin` is planned to be refactored to reuse a single rdkafka admin instance across multiple operations rather than creating and tearing one down per call. When that happens, this method will need to release that shared instance. The no-op is here now so that all callers are already written against the correct API and require no changes when the real implementation lands.



53
54
# File 'lib/karafka/admin.rb', line 53

def close
end

#cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



373
374
375
# File 'lib/karafka/admin.rb', line 373

def cluster_info
  with_admin(&:metadata)
end

#copy_consumer_group(previous_name, new_name, topics) ⇒ Object

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to copy offsets

See Also:



319
320
321
# File 'lib/karafka/admin.rb', line 319

def copy_consumer_group(previous_name, new_name, topics)
  ConsumerGroups.new(kafka: @custom_kafka).copy(previous_name, new_name, topics)
end

#create_partitions(name, partitions) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with

See Also:



281
282
283
# File 'lib/karafka/admin.rb', line 281

def create_partitions(name, partitions)
  Topics.new(kafka: @custom_kafka).create_partitions(name, partitions)
end

#create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions for this topic

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details

See Also:



268
269
270
# File 'lib/karafka/admin.rb', line 268

def create_topic(name, partitions, replication_factor, topic_config = {})
  Topics.new(kafka: @custom_kafka).create(name, partitions, replication_factor, topic_config)
end

#delete_consumer_group(group_id) ⇒ Object

Parameters:

  • group_id (String)

    group name

See Also:



339
340
341
# File 'lib/karafka/admin.rb', line 339

def delete_consumer_group(group_id)
  ConsumerGroups.new(kafka: @custom_kafka).delete(group_id)
end

#delete_topic(name) ⇒ Object

Parameters:

  • name (String)

    topic name

See Also:



274
275
276
# File 'lib/karafka/admin.rb', line 274

def delete_topic(name)
  Topics.new(kafka: @custom_kafka).delete(name)
end

#plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Object

Parameters:

  • topic (String)

    topic name to plan replication for

  • replication_factor (Integer)

    target replication factor

  • brokers (Hash, nil) (defaults to: nil)

    optional manual broker assignments per partition

See Also:



364
365
366
367
368
369
370
# File 'lib/karafka/admin.rb', line 364

def plan_topic_replication(topic:, replication_factor:, brokers: nil)
  Replication.new(kafka: @custom_kafka).plan(
    topic: topic,
    to: replication_factor,
    brokers: brokers
  )
end

#read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) ⇒ Object

Parameters:

  • groups_with_topics (Hash{String => Array<String>}) (defaults to: {})

    hash with group names with array of topics

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, will select also inactive topics

See Also:



353
354
355
356
357
358
# File 'lib/karafka/admin.rb', line 353

def read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true)
  ConsumerGroups.new(kafka: @custom_kafka).read_lags_with_offsets(
    groups_with_topics,
    active_topics_only: active_topics_only
  )
end

#read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object

Parameters:

  • topic_partition_offsets (Hash{String => Array<Hash>})

    topics with partition specs

  • isolation_level (Integer, nil) (defaults to: nil)

    optional isolation level constant

See Also:



295
296
297
# File 'lib/karafka/admin.rb', line 295

def read_partition_offsets(topic_partition_offsets, isolation_level: nil)
  Topics.new(kafka: @custom_kafka).read_partition_offsets(topic_partition_offsets, isolation_level: isolation_level)
end

#read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

See Also:



259
260
261
# File 'lib/karafka/admin.rb', line 259

def read_topic(name, partition, count, start_offset = -1, settings = {})
  Topics.new(kafka: @custom_kafka).read(name, partition, count, start_offset, settings)
end

#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition (nil when using hash format)

See Also:



288
289
290
# File 'lib/karafka/admin.rb', line 288

def read_watermark_offsets(name_or_hash, partition = nil)
  Topics.new(kafka: @custom_kafka).read_watermark_offsets(name_or_hash, partition)
end

#rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Object

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename

See Also:



328
329
330
331
332
333
334
335
# File 'lib/karafka/admin.rb', line 328

def rename_consumer_group(previous_name, new_name, topics, delete_previous: true)
  ConsumerGroups.new(kafka: @custom_kafka).rename(
    previous_name,
    new_name,
    topics,
    delete_previous: delete_previous
  )
end

#seek_consumer_group(group_id, topics_with_partitions_and_offsets) ⇒ Object

Parameters:

  • group_id (String)

    group for which we want to move offsets

  • topics_with_partitions_and_offsets (Hash)

    hash with topics and settings

See Also:



308
309
310
311
312
313
# File 'lib/karafka/admin.rb', line 308

def seek_consumer_group(group_id, topics_with_partitions_and_offsets)
  ConsumerGroups.new(kafka: @custom_kafka).seek(
    group_id,
    topics_with_partitions_and_offsets
  )
end

#topic_info(topic_name) ⇒ Object

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

See Also:



301
302
303
# File 'lib/karafka/admin.rb', line 301

def topic_info(topic_name)
  Topics.new(kafka: @custom_kafka).info(topic_name)
end

#trigger_rebalance(group_id) ⇒ Object

Parameters:

  • group_id (String)

    group id to trigger rebalance for

See Also:



345
346
347
# File 'lib/karafka/admin.rb', line 345

def trigger_rebalance(group_id)
  ConsumerGroups.new(kafka: @custom_kafka).trigger_rebalance(group_id)
end

#with_adminObject

Creates admin instance and yields it. After usage it closes the admin instance



410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/karafka/admin.rb', line 410

def with_admin
  bind_id = SecureRandom.uuid

  admin = config(:producer, {}).admin(
    native_kafka_auto_start: false,
    native_kafka_poll_timeout_ms: self.class.poll_timeout
  )

  bind_oauth(bind_id, admin)

  admin.start
  proxy = Karafka::Connection::Proxy.new(admin)
  yield(proxy)
ensure
  admin&.close

  unbind_oauth(bind_id)
end

#with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/karafka/admin.rb', line 384

def with_consumer(settings = {})
  bind_id = SecureRandom.uuid

  consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false)
  bind_oauth(bind_id, consumer)

  consumer.start
  proxy = Karafka::Connection::Proxy.new(consumer)
  yield(proxy)
ensure
  # Always unsubscribe consumer just to be sure, that no metadata requests are running
  # when we close the consumer. This in theory should prevent from some race-conditions
  # that originate from librdkafka
  begin
    consumer&.unsubscribe
  # Ignore any errors and continue to close consumer despite them
  rescue Rdkafka::RdkafkaError
    nil
  end

  consumer&.close

  unbind_oauth(bind_id)
end