Class: Karafka::Admin

Inherits:
Object
  • Object
show all
Extended by:
Core::Helpers::Time
Defined in:
lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/topics.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/replication.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/consumer_groups.rb,
lib/karafka/admin/configs/resource.rb,
lib/karafka/admin/contracts/replication.rb

Overview

Note:

It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.

Note:

By default it uses the primary defined cluster. For multi-cluster operations, create an Admin instance with custom kafka configuration: ‘Karafka::Admin.new(kafka: { ’bootstrap.servers’: ‘other:9092’ })‘

Admin actions that we can perform via Karafka on our Kafka cluster

Defined Under Namespace

Modules: Contracts Classes: Acl, Configs, ConsumerGroups, Replication, Topics

Constant Summary collapse

Recovery =

We alias this for Pro users so we don’t end up having two Admin namespaces from the end user perspective. This enhances the UX.

Karafka::Pro::Admin::Recovery

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(kafka: {}) ⇒ Admin

Creates a new Admin instance

Examples:

Create admin for a different cluster

admin = Karafka::Admin.new(kafka: { 'bootstrap.servers': 'other-cluster:9092' })
admin.cluster_info

Parameters:

  • kafka (Hash) (defaults to: {})

    custom kafka configuration to merge with app defaults. Useful for multi-cluster operations where you want to target a different cluster.



37
38
39
# File 'lib/karafka/admin.rb', line 37

def initialize(kafka: {})
  @custom_kafka = kafka
end

Instance Attribute Details

#custom_kafkaHash (readonly)

Custom kafka configuration for this admin instance

Returns:

  • (Hash)

    custom kafka settings to merge with defaults



27
28
29
# File 'lib/karafka/admin.rb', line 27

def custom_kafka
  @custom_kafka
end

Class Method Details

.cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



209
210
211
# File 'lib/karafka/admin.rb', line 209

def cluster_info
  new.cluster_info
end

.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean

Takes consumer group and its topics and copies all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

Returns:

  • (Boolean)

    true if anything was migrated, otherwise false

See Also:



104
105
106
# File 'lib/karafka/admin.rb', line 104

def copy_consumer_group(previous_name, new_name, topics)
  new.copy_consumer_group(previous_name, new_name, topics)
end

.create_partitions(name, partitions) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with

See Also:



72
73
74
# File 'lib/karafka/admin.rb', line 72

def create_partitions(name, partitions)
  new.create_partitions(name, partitions)
end

.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions we expect

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details

See Also:



59
60
61
# File 'lib/karafka/admin.rb', line 59

def create_topic(name, partitions, replication_factor, topic_config = {})
  new.create_topic(name, partitions, replication_factor, topic_config)
end

.delete_consumer_group(consumer_group_id) ⇒ Object

Removes given consumer group (if exists)

Parameters:

  • consumer_group_id (String)

    consumer group name

See Also:



131
132
133
# File 'lib/karafka/admin.rb', line 131

def delete_consumer_group(consumer_group_id)
  new.delete_consumer_group(consumer_group_id)
end

.delete_topic(name) ⇒ Object

Parameters:

  • name (String)

    topic name

See Also:



65
66
67
# File 'lib/karafka/admin.rb', line 65

def delete_topic(name)
  new.delete_topic(name)
end

.plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Replication

Plans a replication factor increase for a topic that can be used with Kafka’s reassignment tools. Since librdkafka does not support increasing replication factor directly, this method generates the necessary JSON and commands for manual execution.

Examples:

Plan replication increase with automatic broker distribution

plan = Karafka::Admin.plan_topic_replication(topic: 'events', replication_factor: 3)

# Review the plan
puts plan.summary

# Export JSON for Kafka's reassignment tools
plan.export_to_file('reassignment.json')

# Execute the plan (replace <KAFKA_BROKERS> with actual brokers)
system(plan.execution_commands[:execute].gsub('<KAFKA_BROKERS>', 'localhost:9092'))

Plan replication with manual broker placement - specify brokers

plan = Karafka::Admin.plan_topic_replication(
  topic: 'events',
  replication_factor: 3,
  brokers: {
    0 => [1, 2, 4],  # Partition 0 on brokers 1, 2, 4
    1 => [2, 3, 4],  # Partition 1 on brokers 2, 3, 4
    2 => [1, 3, 5]   # Partition 2 on brokers 1, 3, 5
  }
)

# The plan will use your exact broker specifications
puts plan.partitions_assignment
# => { 0=>[1, 2, 4], 1=>[2, 3, 4], 2=>[1, 3, 5] }

Parameters:

  • topic (String)

    name of the topic to plan replication for

  • replication_factor (Integer)

    target replication factor (must be higher than current)

  • brokers (Hash{Integer => Array<Integer>}) (defaults to: nil)

    optional manual broker assignments per partition. Keys are partition IDs, values are arrays of broker IDs. If not provided, assignments distribution will happen automatically.

Returns:

  • (Replication)

    plan object with JSON, commands, and instructions

See Also:



200
201
202
203
204
205
206
# File 'lib/karafka/admin.rb', line 200

def plan_topic_replication(topic:, replication_factor:, brokers: nil)
  new.plan_topic_replication(
    topic: topic,
    replication_factor: replication_factor,
    brokers: brokers
  )
end

.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}

Reads lags and offsets for given topics in the context of consumer groups defined in the

routing

Parameters:

  • consumer_groups_with_topics (Hash{String => Array<String>}) (defaults to: {})

    hash with consumer groups names with array of topics to query per consumer group inside

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, when we use routing topics, will select also topics that are marked as inactive in routing

Returns:

  • (Hash{String => Hash{Integer => Hash{Integer => Object}}})

    hash where the top level keys are the consumer groups and values are hashes with topics and inside partitions with lags and offsets

See Also:



154
155
156
157
158
159
# File 'lib/karafka/admin.rb', line 154

def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
  new.read_lags_with_offsets(
    consumer_groups_with_topics,
    active_topics_only: active_topics_only
  )
end

.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start

  • settings (Hash) (defaults to: {})

    kafka extra settings

See Also:



50
51
52
# File 'lib/karafka/admin.rb', line 50

def read_topic(name, partition, count, start_offset = -1, settings = {})
  new.read_topic(name, partition, count, start_offset, settings)
end

.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition (nil when using hash format)

See Also:



79
80
81
# File 'lib/karafka/admin.rb', line 79

def read_watermark_offsets(name_or_hash, partition = nil)
  new.read_watermark_offsets(name_or_hash, partition)
end

.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean

Takes consumer group and its topics and migrates all the offsets to a new named group

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets during rename

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename. Defaults to true.

Returns:

  • (Boolean)

    true if rename (and optionally removal) was ok or false if there was nothing really to rename

See Also:



118
119
120
121
122
123
124
125
# File 'lib/karafka/admin.rb', line 118

def rename_consumer_group(previous_name, new_name, topics, delete_previous: true)
  new.rename_consumer_group(
    previous_name,
    new_name,
    topics,
    delete_previous: delete_previous
  )
end

.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object

Parameters:

  • consumer_group_id (String)

    id of the consumer group for which we want to move the existing offset

  • topics_with_partitions_and_offsets (Hash)

    Hash with list of topics and settings

See Also:



93
94
95
# File 'lib/karafka/admin.rb', line 93

def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets)
  new.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets)
end

.topic_info(topic_name) ⇒ Object

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

See Also:



85
86
87
# File 'lib/karafka/admin.rb', line 85

def topic_info(topic_name)
  new.topic_info(topic_name)
end

.trigger_rebalance(consumer_group_id) ⇒ Object

Note:

This API should be used only for development.

Triggers a rebalance for the specified consumer group

Parameters:

  • consumer_group_id (String)

    consumer group id to trigger rebalance for

See Also:



140
141
142
# File 'lib/karafka/admin.rb', line 140

def trigger_rebalance(consumer_group_id)
  new.trigger_rebalance(consumer_group_id)
end

.with_adminObject

Creates admin instance and yields it. After usage it closes the admin instance



225
226
227
# File 'lib/karafka/admin.rb', line 225

def with_admin(&)
  new.with_admin(&)
end

.with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



220
221
222
# File 'lib/karafka/admin.rb', line 220

def with_consumer(settings = {}, &)
  new.with_consumer(settings, &)
end

Instance Method Details

#cluster_infoRdkafka::Metadata

Returns cluster metadata info.

Returns:

  • (Rdkafka::Metadata)

    cluster metadata info



345
346
347
# File 'lib/karafka/admin.rb', line 345

def cluster_info
  with_admin(&:metadata)
end

#copy_consumer_group(previous_name, new_name, topics) ⇒ Object

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to copy offsets

See Also:



291
292
293
# File 'lib/karafka/admin.rb', line 291

def copy_consumer_group(previous_name, new_name, topics)
  ConsumerGroups.new(kafka: @custom_kafka).copy(previous_name, new_name, topics)
end

#create_partitions(name, partitions) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    total number of partitions we expect to end up with

See Also:



260
261
262
# File 'lib/karafka/admin.rb', line 260

def create_partitions(name, partitions)
  Topics.new(kafka: @custom_kafka).create_partitions(name, partitions)
end

#create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object

Parameters:

  • name (String)

    topic name

  • partitions (Integer)

    number of partitions for this topic

  • replication_factor (Integer)

    number of replicas

  • topic_config (Hash) (defaults to: {})

    topic config details

See Also:



247
248
249
# File 'lib/karafka/admin.rb', line 247

def create_topic(name, partitions, replication_factor, topic_config = {})
  Topics.new(kafka: @custom_kafka).create(name, partitions, replication_factor, topic_config)
end

#delete_consumer_group(consumer_group_id) ⇒ Object

Parameters:

  • consumer_group_id (String)

    consumer group name

See Also:



311
312
313
# File 'lib/karafka/admin.rb', line 311

def delete_consumer_group(consumer_group_id)
  ConsumerGroups.new(kafka: @custom_kafka).delete(consumer_group_id)
end

#delete_topic(name) ⇒ Object

Parameters:

  • name (String)

    topic name

See Also:



253
254
255
# File 'lib/karafka/admin.rb', line 253

def delete_topic(name)
  Topics.new(kafka: @custom_kafka).delete(name)
end

#plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Object

Parameters:

  • topic (String)

    topic name to plan replication for

  • replication_factor (Integer)

    target replication factor

  • brokers (Hash, nil) (defaults to: nil)

    optional manual broker assignments per partition

See Also:



336
337
338
339
340
341
342
# File 'lib/karafka/admin.rb', line 336

def plan_topic_replication(topic:, replication_factor:, brokers: nil)
  Replication.new(kafka: @custom_kafka).plan(
    topic: topic,
    to: replication_factor,
    brokers: brokers
  )
end

#read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Object

Parameters:

  • consumer_groups_with_topics (Hash{String => Array<String>}) (defaults to: {})

    hash with consumer groups names with array of topics

  • active_topics_only (Boolean) (defaults to: true)

    if set to false, will select also inactive topics

See Also:



325
326
327
328
329
330
# File 'lib/karafka/admin.rb', line 325

def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true)
  ConsumerGroups.new(kafka: @custom_kafka).read_lags_with_offsets(
    consumer_groups_with_topics,
    active_topics_only: active_topics_only
  )
end

#read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object

Parameters:

  • name (String, Symbol)

    topic name

  • partition (Integer)

    partition

  • count (Integer)

    how many messages we want to get at most

  • start_offset (Integer, Time) (defaults to: -1,)

    offset from which we should start

  • settings (Hash) (defaults to: {})

    kafka extra settings (optional)

See Also:



238
239
240
# File 'lib/karafka/admin.rb', line 238

def read_topic(name, partition, count, start_offset = -1, settings = {})
  Topics.new(kafka: @custom_kafka).read(name, partition, count, start_offset, settings)
end

#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object

Parameters:

  • name_or_hash (String, Symbol, Hash)

    topic name or hash with topics and partitions

  • partition (Integer, nil) (defaults to: nil)

    partition (nil when using hash format)

See Also:



267
268
269
# File 'lib/karafka/admin.rb', line 267

def read_watermark_offsets(name_or_hash, partition = nil)
  Topics.new(kafka: @custom_kafka).read_watermark_offsets(name_or_hash, partition)
end

#rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Object

Parameters:

  • previous_name (String)

    old consumer group name

  • new_name (String)

    new consumer group name

  • topics (Array<String>)

    topics for which we want to migrate offsets

  • delete_previous (Boolean) (defaults to: true)

    should we delete previous consumer group after rename

See Also:



300
301
302
303
304
305
306
307
# File 'lib/karafka/admin.rb', line 300

def rename_consumer_group(previous_name, new_name, topics, delete_previous: true)
  ConsumerGroups.new(kafka: @custom_kafka).rename(
    previous_name,
    new_name,
    topics,
    delete_previous: delete_previous
  )
end

#seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object

Parameters:

  • consumer_group_id (String)

    consumer group for which we want to move offsets

  • topics_with_partitions_and_offsets (Hash)

    hash with topics and settings

See Also:



280
281
282
283
284
285
# File 'lib/karafka/admin.rb', line 280

def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets)
  ConsumerGroups.new(kafka: @custom_kafka).seek(
    consumer_group_id,
    topics_with_partitions_and_offsets
  )
end

#topic_info(topic_name) ⇒ Object

Parameters:

  • topic_name (String)

    name of the topic we’re interested in

See Also:



273
274
275
# File 'lib/karafka/admin.rb', line 273

def topic_info(topic_name)
  Topics.new(kafka: @custom_kafka).info(topic_name)
end

#trigger_rebalance(consumer_group_id) ⇒ Object

Parameters:

  • consumer_group_id (String)

    consumer group id to trigger rebalance for

See Also:



317
318
319
# File 'lib/karafka/admin.rb', line 317

def trigger_rebalance(consumer_group_id)
  ConsumerGroups.new(kafka: @custom_kafka).trigger_rebalance(consumer_group_id)
end

#with_adminObject

Creates admin instance and yields it. After usage it closes the admin instance



382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# File 'lib/karafka/admin.rb', line 382

def with_admin
  bind_id = SecureRandom.uuid

  admin = config(:producer, {}).admin(
    native_kafka_auto_start: false,
    native_kafka_poll_timeout_ms: self.class.poll_timeout
  )

  bind_oauth(bind_id, admin)

  admin.start
  proxy = Karafka::Connection::Proxy.new(admin)
  yield(proxy)
ensure
  admin&.close

  unbind_oauth(bind_id)
end

#with_consumer(settings = {}) ⇒ Object

Note:

We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated

Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage

Parameters:

  • settings (Hash) (defaults to: {})

    extra settings to customize consumer



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/karafka/admin.rb', line 356

def with_consumer(settings = {})
  bind_id = SecureRandom.uuid

  consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false)
  bind_oauth(bind_id, consumer)

  consumer.start
  proxy = Karafka::Connection::Proxy.new(consumer)
  yield(proxy)
ensure
  # Always unsubscribe consumer just to be sure, that no metadata requests are running
  # when we close the consumer. This in theory should prevent from some race-conditions
  # that originate from librdkafka
  begin
    consumer&.unsubscribe
  # Ignore any errors and continue to close consumer despite them
  rescue Rdkafka::RdkafkaError
    nil
  end

  consumer&.close

  unbind_oauth(bind_id)
end