Module: Legion::Transport::Kafka::Admin
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/transport/kafka/admin.rb
Overview
Thin wrapper around rdkafka admin operations. Used internally to ensure topics exist before publishing/subscribing.
Class Method Summary collapse
-
.ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) ⇒ Boolean
Idempotently create a topic.
Class Method Details
.ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) ⇒ Boolean
Idempotently create a topic. Returns true if the topic was created or already exists, raises AdminError on other failures.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/legion/transport/kafka/admin.rb', line 23 def ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) admin = build_admin handle = admin.create_topic(topic, partitions, replication_factor, config) handle.wait(max_wait_timeout: operation_timeout) log_created(topic, partitions) true rescue ::Rdkafka::RdkafkaError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.kafka.admin.ensure_topic', topic: topic, partitions: partitions) # Error code 36 = TOPIC_ALREADY_EXISTS — not a real error for our purposes. return true if e.respond_to?(:code) && e.code == :topic_already_exists return true if e..to_s.include?('TOPIC_ALREADY_EXISTS') || e..to_s.include?('Topic already exists') raise Legion::Transport::Kafka::AdminError, "ensure_topic(#{topic}) failed: #{e.}" rescue StandardError => e handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.admin.ensure_topic', topic: topic, partitions: partitions) raise Legion::Transport::Kafka::AdminError, "ensure_topic(#{topic}) failed: #{e.}" ensure safely_close_admin(admin) end |