Class: Karafka::Admin
- Inherits:
-
Object
- Object
- Karafka::Admin
- Extended by:
- Core::Helpers::Time
- Defined in:
- lib/karafka/admin.rb,
lib/karafka/admin/acl.rb,
lib/karafka/admin/topics.rb,
lib/karafka/admin/configs.rb,
lib/karafka/admin/replication.rb,
lib/karafka/admin/configs/config.rb,
lib/karafka/admin/consumer_groups.rb,
lib/karafka/admin/configs/resource.rb,
lib/karafka/admin/isolation_levels.rb,
lib/karafka/admin/contracts/replication.rb
Overview
It always initializes a new admin instance as we want to ensure it is always closed Since admin actions are not performed that often, that should be ok.
By default it uses the primary defined cluster. For multi-cluster operations, create an Admin instance with custom kafka configuration: ‘Karafka::Admin.new(kafka: { ’bootstrap.servers’: ‘other:9092’ })‘
Admin actions that we can perform via Karafka on our Kafka cluster
Direct Known Subclasses
Acl, Configs, ConsumerGroups, Replication, Topics, Pro::Admin::Recovery
Defined Under Namespace
Modules: Contracts, IsolationLevels Classes: Acl, Configs, ConsumerGroups, Replication, Topics
Constant Summary collapse
- Recovery =
We alias this for Pro users so we don’t end up having two Admin namespaces from the end user perspective. This enhances the UX.
Karafka::Pro::Admin::Recovery
Instance Attribute Summary collapse
-
#custom_kafka ⇒ Hash
readonly
Custom kafka configuration for this admin instance.
Class Method Summary collapse
-
.cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
-
.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group.
- .create_partitions(name, partitions) ⇒ Object
- .create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
-
.delete_consumer_group(group_id) ⇒ Object
Removes given group (if exists).
- .delete_topic(name) ⇒ Object
-
.plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Replication
Plans a replication factor increase for a topic that can be used with Kafka’s reassignment tools.
-
.read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of groups defined in the routing.
- .read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object
- .read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
- .read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
-
.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group.
- .seek_consumer_group(group_id, topics_with_partitions_and_offsets) ⇒ Object
- .topic_info(topic_name) ⇒ Object
-
.trigger_rebalance(group_id) ⇒ Object
Triggers a rebalance for the specified group.
-
.with_admin ⇒ Object
Creates admin instance and yields it.
-
.with_consumer(settings = {}) ⇒ Object
Creates consumer instance and yields it.
Instance Method Summary collapse
-
#close ⇒ Object
No-op close to normalize the API surface.
-
#cluster_info ⇒ Rdkafka::Metadata
Cluster metadata info.
- #copy_consumer_group(previous_name, new_name, topics) ⇒ Object
- #create_partitions(name, partitions) ⇒ Object
- #create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
- #delete_consumer_group(group_id) ⇒ Object
- #delete_topic(name) ⇒ Object
-
#initialize(kafka: {}) ⇒ Admin
constructor
Creates a new Admin instance.
- #plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Object
- #read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) ⇒ Object
- #read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object
- #read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
- #read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
- #rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Object
- #seek_consumer_group(group_id, topics_with_partitions_and_offsets) ⇒ Object
- #topic_info(topic_name) ⇒ Object
- #trigger_rebalance(group_id) ⇒ Object
-
#with_admin ⇒ Object
Creates admin instance and yields it.
-
#with_consumer(settings = {}) ⇒ Object
Creates consumer instance and yields it.
Constructor Details
#initialize(kafka: {}) ⇒ Admin
Creates a new Admin instance
37 38 39 |
# File 'lib/karafka/admin.rb', line 37 def initialize(kafka: {}) @custom_kafka = kafka end |
Instance Attribute Details
#custom_kafka ⇒ Hash (readonly)
Custom kafka configuration for this admin instance
27 28 29 |
# File 'lib/karafka/admin.rb', line 27 def custom_kafka @custom_kafka end |
Class Method Details
.cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
230 231 232 |
# File 'lib/karafka/admin.rb', line 230 def cluster_info new.cluster_info end |
.copy_consumer_group(previous_name, new_name, topics) ⇒ Boolean
Takes consumer group and its topics and copies all the offsets to a new named group
126 127 128 |
# File 'lib/karafka/admin.rb', line 126 def copy_consumer_group(previous_name, new_name, topics) new.copy_consumer_group(previous_name, new_name, topics) end |
.create_partitions(name, partitions) ⇒ Object
87 88 89 |
# File 'lib/karafka/admin.rb', line 87 def create_partitions(name, partitions) new.create_partitions(name, partitions) end |
.create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
74 75 76 |
# File 'lib/karafka/admin.rb', line 74 def create_topic(name, partitions, replication_factor, topic_config = {}) new.create_topic(name, partitions, replication_factor, topic_config) end |
.delete_consumer_group(group_id) ⇒ Object
Removes given group (if exists)
153 154 155 |
# File 'lib/karafka/admin.rb', line 153 def delete_consumer_group(group_id) new.delete_consumer_group(group_id) end |
.delete_topic(name) ⇒ Object
80 81 82 |
# File 'lib/karafka/admin.rb', line 80 def delete_topic(name) new.delete_topic(name) end |
.plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Replication
Plans a replication factor increase for a topic that can be used with Kafka’s reassignment tools. Since librdkafka does not support increasing replication factor directly, this method generates the necessary JSON and commands for manual execution.
221 222 223 224 225 226 227 |
# File 'lib/karafka/admin.rb', line 221 def plan_topic_replication(topic:, replication_factor:, brokers: nil) new.plan_topic_replication( topic: topic, replication_factor: replication_factor, brokers: brokers ) end |
.read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) ⇒ Hash{String => Hash{Integer => Hash{Integer => Object}}}
Reads lags and offsets for given topics in the context of groups defined in the routing
175 176 177 178 179 180 |
# File 'lib/karafka/admin.rb', line 175 def read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) new.read_lags_with_offsets( groups_with_topics, active_topics_only: active_topics_only ) end |
.read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object
101 102 103 |
# File 'lib/karafka/admin.rb', line 101 def read_partition_offsets(topic_partition_offsets, isolation_level: nil) new.read_partition_offsets(topic_partition_offsets, isolation_level: isolation_level) end |
.read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
65 66 67 |
# File 'lib/karafka/admin.rb', line 65 def read_topic(name, partition, count, start_offset = -1, settings = {}) new.read_topic(name, partition, count, start_offset, settings) end |
.read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
94 95 96 |
# File 'lib/karafka/admin.rb', line 94 def read_watermark_offsets(name_or_hash, partition = nil) new.read_watermark_offsets(name_or_hash, partition) end |
.rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Boolean
Takes consumer group and its topics and migrates all the offsets to a new named group
140 141 142 143 144 145 146 147 |
# File 'lib/karafka/admin.rb', line 140 def rename_consumer_group(previous_name, new_name, topics, delete_previous: true) new.rename_consumer_group( previous_name, new_name, topics, delete_previous: delete_previous ) end |
.seek_consumer_group(group_id, topics_with_partitions_and_offsets) ⇒ Object
115 116 117 |
# File 'lib/karafka/admin.rb', line 115 def seek_consumer_group(group_id, topics_with_partitions_and_offsets) new.seek_consumer_group(group_id, topics_with_partitions_and_offsets) end |
.topic_info(topic_name) ⇒ Object
107 108 109 |
# File 'lib/karafka/admin.rb', line 107 def topic_info(topic_name) new.topic_info(topic_name) end |
.trigger_rebalance(group_id) ⇒ Object
This API should be used only for development.
Triggers a rebalance for the specified group
162 163 164 |
# File 'lib/karafka/admin.rb', line 162 def trigger_rebalance(group_id) new.trigger_rebalance(group_id) end |
.with_admin ⇒ Object
Creates admin instance and yields it. After usage it closes the admin instance
246 247 248 |
# File 'lib/karafka/admin.rb', line 246 def with_admin(&) new.with_admin(&) end |
.with_consumer(settings = {}) ⇒ Object
We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated
Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage
241 242 243 |
# File 'lib/karafka/admin.rb', line 241 def with_consumer(settings = {}, &) new.with_consumer(settings, &) end |
Instance Method Details
#close ⇒ Object
No-op close to normalize the API surface.
Each admin operation currently opens and closes its own underlying rdkafka admin instance internally, so there is nothing to release at the ‘Karafka::Admin` level right now. This method exists so that callers who hold an instance and call `#close` on it (matching the pattern of other closeable resources) do not raise `NoMethodError`.
In the future, ‘Karafka::Admin` is planned to be refactored to reuse a single rdkafka admin instance across multiple operations rather than creating and tearing one down per call. When that happens, this method will need to release that shared instance. The no-op is here now so that all callers are already written against the correct API and require no changes when the real implementation lands.
53 54 |
# File 'lib/karafka/admin.rb', line 53 def close end |
#cluster_info ⇒ Rdkafka::Metadata
Returns cluster metadata info.
373 374 375 |
# File 'lib/karafka/admin.rb', line 373 def cluster_info with_admin(&:metadata) end |
#copy_consumer_group(previous_name, new_name, topics) ⇒ Object
319 320 321 |
# File 'lib/karafka/admin.rb', line 319 def copy_consumer_group(previous_name, new_name, topics) ConsumerGroups.new(kafka: @custom_kafka).copy(previous_name, new_name, topics) end |
#create_partitions(name, partitions) ⇒ Object
281 282 283 |
# File 'lib/karafka/admin.rb', line 281 def create_partitions(name, partitions) Topics.new(kafka: @custom_kafka).create_partitions(name, partitions) end |
#create_topic(name, partitions, replication_factor, topic_config = {}) ⇒ Object
268 269 270 |
# File 'lib/karafka/admin.rb', line 268 def create_topic(name, partitions, replication_factor, topic_config = {}) Topics.new(kafka: @custom_kafka).create(name, partitions, replication_factor, topic_config) end |
#delete_consumer_group(group_id) ⇒ Object
339 340 341 |
# File 'lib/karafka/admin.rb', line 339 def delete_consumer_group(group_id) ConsumerGroups.new(kafka: @custom_kafka).delete(group_id) end |
#delete_topic(name) ⇒ Object
274 275 276 |
# File 'lib/karafka/admin.rb', line 274 def delete_topic(name) Topics.new(kafka: @custom_kafka).delete(name) end |
#plan_topic_replication(topic:, replication_factor:, brokers: nil) ⇒ Object
364 365 366 367 368 369 370 |
# File 'lib/karafka/admin.rb', line 364 def plan_topic_replication(topic:, replication_factor:, brokers: nil) Replication.new(kafka: @custom_kafka).plan( topic: topic, to: replication_factor, brokers: brokers ) end |
#read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) ⇒ Object
353 354 355 356 357 358 |
# File 'lib/karafka/admin.rb', line 353 def read_lags_with_offsets(groups_with_topics = {}, active_topics_only: true) ConsumerGroups.new(kafka: @custom_kafka).read_lags_with_offsets( groups_with_topics, active_topics_only: active_topics_only ) end |
#read_partition_offsets(topic_partition_offsets, isolation_level: nil) ⇒ Object
295 296 297 |
# File 'lib/karafka/admin.rb', line 295 def read_partition_offsets(topic_partition_offsets, isolation_level: nil) Topics.new(kafka: @custom_kafka).read_partition_offsets(topic_partition_offsets, isolation_level: isolation_level) end |
#read_topic(name, partition, count, start_offset = -1,, settings = {}) ⇒ Object
259 260 261 |
# File 'lib/karafka/admin.rb', line 259 def read_topic(name, partition, count, start_offset = -1, settings = {}) Topics.new(kafka: @custom_kafka).read(name, partition, count, start_offset, settings) end |
#read_watermark_offsets(name_or_hash, partition = nil) ⇒ Object
288 289 290 |
# File 'lib/karafka/admin.rb', line 288 def read_watermark_offsets(name_or_hash, partition = nil) Topics.new(kafka: @custom_kafka).read_watermark_offsets(name_or_hash, partition) end |
#rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ⇒ Object
328 329 330 331 332 333 334 335 |
# File 'lib/karafka/admin.rb', line 328 def rename_consumer_group(previous_name, new_name, topics, delete_previous: true) ConsumerGroups.new(kafka: @custom_kafka).rename( previous_name, new_name, topics, delete_previous: delete_previous ) end |
#seek_consumer_group(group_id, topics_with_partitions_and_offsets) ⇒ Object
308 309 310 311 312 313 |
# File 'lib/karafka/admin.rb', line 308 def seek_consumer_group(group_id, topics_with_partitions_and_offsets) ConsumerGroups.new(kafka: @custom_kafka).seek( group_id, topics_with_partitions_and_offsets ) end |
#topic_info(topic_name) ⇒ Object
301 302 303 |
# File 'lib/karafka/admin.rb', line 301 def topic_info(topic_name) Topics.new(kafka: @custom_kafka).info(topic_name) end |
#trigger_rebalance(group_id) ⇒ Object
345 346 347 |
# File 'lib/karafka/admin.rb', line 345 def trigger_rebalance(group_id) ConsumerGroups.new(kafka: @custom_kafka).trigger_rebalance(group_id) end |
#with_admin ⇒ Object
Creates admin instance and yields it. After usage it closes the admin instance
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/karafka/admin.rb', line 410 def with_admin bind_id = SecureRandom.uuid admin = config(:producer, {}).admin( native_kafka_auto_start: false, native_kafka_poll_timeout_ms: self.class.poll_timeout ) bind_oauth(bind_id, admin) admin.start proxy = Karafka::Connection::Proxy.new(admin) yield(proxy) ensure admin&.close unbind_oauth(bind_id) end |
#with_consumer(settings = {}) ⇒ Object
We always ship and yield a proxied consumer because admin API performance is not that relevant. That is, there are no high frequency calls that would have to be delegated
Creates consumer instance and yields it. After usage it closes the consumer instance This API can be used in other pieces of code and allows for low-level consumer usage
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/karafka/admin.rb', line 384 def with_consumer(settings = {}) bind_id = SecureRandom.uuid consumer = config(:consumer, settings).consumer(native_kafka_auto_start: false) bind_oauth(bind_id, consumer) consumer.start proxy = Karafka::Connection::Proxy.new(consumer) yield(proxy) ensure # Always unsubscribe consumer just to be sure, that no metadata requests are running # when we close the consumer. This in theory should prevent from some race-conditions # that originate from librdkafka begin consumer&.unsubscribe # Ignore any errors and continue to close consumer despite them rescue Rdkafka::RdkafkaError nil end consumer&.close unbind_oauth(bind_id) end |