Class: Karafka::Admin
- Inherits:
-
Object
- Object
- Karafka::Admin
- Extended by:
- Core::Helpers::Time
- Defined in:
- lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/topics.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/replication.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/consumer_groups.rb,
lib/karafka/admin/configs/resource.rb,
lib/karafka/admin/contracts/replication.rb
Overview
It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.
By default it uses the primary defined cluster. For multi-cluster operations, create an Admin instance with custom kafka configuration: ‘Karafka::Admin.new(kafka: { ’bootstrap.servers’: ‘other:9092’ })‘
Admin actions that we can perform via Karafka on our Kafka cluster
Direct Known Subclasses
Acl, Configs, ConsumerGroups, Replication, Topics, Pro::Admin::Recovery
Defined Under Namespace
Modules: Contracts Classes: Acl, Configs, ConsumerGroups, Replication, Topics
Constant Summary collapse
- Recovery =
We alias this for Pro users so we don’t end up having two Admin namespaces from the end user perspective. This enhances the UX.
Karafka::Pro::Admin::Recovery
Instance Attribute Summary collapse
-
#custom_kafka ⇒ Hash
readonly
Custom kafka configuration for this admin instance.
Class Method Summary collapse
-
.cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
-
.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group.
- .create_partitions(name, partitions) ⇒ Object
- .create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
-
.delete_consumer_group(consumer_group_id) ⇒ Object
Removes given consumer group (if exists).
- .delete_topic(name) ⇒ Object
-
.plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Replication
Plans a replication factor increase for a topic that can be used with Kafka’s reassignment tools.
-
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of consumer groups defined in the routing.
- .read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
- .read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
-
.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group.
- .seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
- .topic_info(topic_name) ⇒ Object
-
.trigger_rebalance(consumer_group_id) ⇒ Object
Triggers a rebalance for the specified consumer group.
-
.with_admin ⇒ Object
Creates admin instance and yields it.
-
.with_consumer(settings = {}) ⇒ Object
Creates consumer instance and yields it.
Instance Method Summary collapse
-
#cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
- #copy_consumer_group(previous_name, new_name, topics) ⇒ Object
- #create_partitions(name, partitions) ⇒ Object
- #create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
- #delete_consumer_group(consumer_group_id) ⇒ Object
- #delete_topic(name) ⇒ Object
-
#initialize(kafka: {}) ⇒ Admin
constructor
Creates a new Admin instance.
- #plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Object
- #read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Object
- #read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
- #read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
- #rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Object
- #seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
- #topic_info(topic_name) ⇒ Object
- #trigger_rebalance(consumer_group_id) ⇒ Object
-
#with_admin ⇒ Object
Creates admin instance and yields it.
-
#with_consumer(settings = {}) ⇒ Object
Creates consumer instance and yields it.
Constructor Details
#initialize(kafka: {}) ⇒ Admin
Creates a new Admin instance
37 38 39 |
# File 'lib/karafka/admin.rb', line 37 def initialize(kafka: {}) @custom_kafka = kafka end |
Instance Attribute Details
#custom_kafka ⇒ Hash (readonly)
Custom kafka configuration for this admin instance
27 28 29 |
# File 'lib/karafka/admin.rb', line 27 def custom_kafka @custom_kafka end |
Class Method Details
.cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
209 210 211 |
# File 'lib/karafka/admin.rb', line 209 def cluster_info new.cluster_info end |
.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group
104 105 106 |
# File 'lib/karafka/admin.rb', line 104 def copy_consumer_group(previous_name, new_name, topics) new.copy_consumer_group(previous_name, new_name, topics) end |
.create_partitions(name, partitions) ⇒ Object
72 73 74 |
# File 'lib/karafka/admin.rb', line 72 def create_partitions(name, partitions) new.create_partitions(name, partitions) end |
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
59 60 61 |
# File 'lib/karafka/admin.rb', line 59 def create_topic(name, partitions, replication_factor, topic_config = {}) new.create_topic(name, partitions, replication_factor, topic_config) end |
.delete_consumer_group(consumer_group_id) ⇒ Object
Removes given consumer group (if exists)
131 132 133 |
# File 'lib/karafka/admin.rb', line 131 def delete_consumer_group(consumer_group_id) new.delete_consumer_group(consumer_group_id) end |
.delete_topic(name) ⇒ Object
65 66 67 |
# File 'lib/karafka/admin.rb', line 65 def delete_topic(name) new.delete_topic(name) end |
.plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Replication
Plans a replication factor increase for a topic that can be used with Kafka’s reassignment tools. Since librdkafka does not support increasing replication factor directly, this method generates the necessary JSON and commands for manual execution.
200 201 202 203 204 205 206 |
# File 'lib/karafka/admin.rb', line 200 def plan_topic_replication(topic:, replication_factor:, brokers: nil) new.plan_topic_replication( topic: topic, replication_factor: replication_factor, brokers: brokers ) end |
.read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of consumer groups defined in the
routing
154 155 156 157 158 159 |
# File 'lib/karafka/admin.rb', line 154 def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) new.read_lags_with_offsets( consumer_groups_with_topics, active_topics_only: active_topics_only ) end |
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
50 51 52 |
# File 'lib/karafka/admin.rb', line 50 def read_topic(name, partition, count, start_offset = -1, settings = {}) new.read_topic(name, partition, count, start_offset, settings) end |
.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
79 80 81 |
# File 'lib/karafka/admin.rb', line 79 def read_watermark_offsets(name_or_hash, partition = nil) new.read_watermark_offsets(name_or_hash, partition) end |
.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group
118 119 120 121 122 123 124 125 |
# File 'lib/karafka/admin.rb', line 118 def rename_consumer_group(previous_name, new_name, topics, delete_previous: true) new.rename_consumer_group( previous_name, new_name, topics, delete_previous: delete_previous ) end |
.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
93 94 95 |
# File 'lib/karafka/admin.rb', line 93 def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) new.seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) end |
.topic_info(topic_name) ⇒ Object
85 86 87 |
# File 'lib/karafka/admin.rb', line 85 def topic_info(topic_name) new.topic_info(topic_name) end |
.trigger_rebalance(consumer_group_id) ⇒ Object
This API should be used only for development.
Triggers a rebalance for the specified consumer group
140 141 142 |
# File 'lib/karafka/admin.rb', line 140 def trigger_rebalance(consumer_group_id) new.trigger_rebalance(consumer_group_id) end |
.with_admin ⇒ Object
Creates admin instance and yields it. After usage it closes the admin instance
225 226 227 |
# File 'lib/karafka/admin.rb', line 225 def with_admin(&) new.with_admin(&) end |
.with_consumer(settings = {}) ⇒ Object
We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated
Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage
220 221 222 |
# File 'lib/karafka/admin.rb', line 220 def with_consumer(settings = {}, &) new.with_consumer(settings, &) end |
Instance Method Details
#cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
345 346 347 |
# File 'lib/karafka/admin.rb', line 345 def cluster_info with_admin(&:metadata) end |
#copy_consumer_group(previous_name, new_name, topics) ⇒ Object
291 292 293 |
# File 'lib/karafka/admin.rb', line 291 def copy_consumer_group(previous_name, new_name, topics) ConsumerGroups.new(kafka: @custom_kafka).copy(previous_name, new_name, topics) end |
#create_partitions(name, partitions) ⇒ Object
260 261 262 |
# File 'lib/karafka/admin.rb', line 260 def create_partitions(name, partitions) Topics.new(kafka: @custom_kafka).create_partitions(name, partitions) end |
#create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
247 248 249 |
# File 'lib/karafka/admin.rb', line 247 def create_topic(name, partitions, replication_factor, topic_config = {}) Topics.new(kafka: @custom_kafka).create(name, partitions, replication_factor, topic_config) end |
#delete_consumer_group(consumer_group_id) ⇒ Object
311 312 313 |
# File 'lib/karafka/admin.rb', line 311 def delete_consumer_group(consumer_group_id) ConsumerGroups.new(kafka: @custom_kafka).delete(consumer_group_id) end |
#delete_topic(name) ⇒ Object
253 254 255 |
# File 'lib/karafka/admin.rb', line 253 def delete_topic(name) Topics.new(kafka: @custom_kafka).delete(name) end |
#plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Object
336 337 338 339 340 341 342 |
# File 'lib/karafka/admin.rb', line 336 def plan_topic_replication(topic:, replication_factor:, brokers: nil) Replication.new(kafka: @custom_kafka).plan( topic: topic, to: replication_factor, brokers: brokers ) end |
#read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ⇒ Object
325 326 327 328 329 330 |
# File 'lib/karafka/admin.rb', line 325 def read_lags_with_offsets(consumer_groups_with_topics = {}, active_topics_only: true) ConsumerGroups.new(kafka: @custom_kafka).read_lags_with_offsets( consumer_groups_with_topics, active_topics_only: active_topics_only ) end |
#read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
238 239 240 |
# File 'lib/karafka/admin.rb', line 238 def read_topic(name, partition, count, start_offset = -1, settings = {}) Topics.new(kafka: @custom_kafka).read(name, partition, count, start_offset, settings) end |
#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
267 268 269 |
# File 'lib/karafka/admin.rb', line 267 def read_watermark_offsets(name_or_hash, partition = nil) Topics.new(kafka: @custom_kafka).read_watermark_offsets(name_or_hash, partition) end |
#rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Object
300 301 302 303 304 305 306 307 |
# File 'lib/karafka/admin.rb', line 300 def rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ConsumerGroups.new(kafka: @custom_kafka).rename( previous_name, new_name, topics, delete_previous: delete_previous ) end |
#seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ⇒ Object
280 281 282 283 284 285 |
# File 'lib/karafka/admin.rb', line 280 def seek_consumer_group(consumer_group_id, topics_with_partitions_and_offsets) ConsumerGroups.new(kafka: @custom_kafka).seek( consumer_group_id, topics_with_partitions_and_offsets ) end |
#topic_info(topic_name) ⇒ Object
273 274 275 |
# File 'lib/karafka/admin.rb', line 273 def topic_info(topic_name) Topics.new(kafka: @custom_kafka).info(topic_name) end |
#trigger_rebalance(consumer_group_id) ⇒ Object
317 318 319 |
# File 'lib/karafka/admin.rb', line 317 def trigger_rebalance(consumer_group_id) ConsumerGroups.new(kafka: @custom_kafka).trigger_rebalance(consumer_group_id) end |
#with_admin ⇒ Object
Creates admin instance and yields it. After usage it closes the admin instance
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/karafka/admin.rb', line 382 def with_admin bind_id = SecureRandom.uuid admin = config(:producer, {}).admin( native_kafka_auto_start: false, native_kafka_poll_timeout_ms: self.class.poll_timeout ) bind_oauth(bind_id, admin) admin.start proxy = Karafka::Connection::Proxy.new(admin) yield(proxy) ensure admin&.close unbind_oauth(bind_id) end |
#with_consumer(settings = {}) ⇒ Object
We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated
Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/karafka/admin.rb', line 356 def with_consumer(settings = {}) bind_id = SecureRandom.uuid consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false) bind_oauth(bind_id, consumer) consumer.start proxy = Karafka::Connection::Proxy.new(consumer) yield(proxy) ensure # Always unsubscribe consumer just to be sure, that no metadata requests are running # when we close the consumer. This in theory should prevent from some race-conditions # that originate from librdkafka begin consumer&.unsubscribe # Ignore any errors and continue to close consumer despite them rescue Rdkafka::RdkafkaError nil end consumer&.close unbind_oauth(bind_id) end |