Module: Legion::Transport::Kafka::Admin

Extended by:
Logging::Helper
Defined in:
lib/legion/transport/kafka/admin.rb

Overview

Thin wrapper around rdkafka admin operations. Used internally to ensure topics exist before publishing/subscribing.

Class Method Summary collapse

Class Method Details

.ensure_topic(topic, partitions: 1, replication_factor: 1, config: {}) ⇒ Boolean

Idempotently create a topic. Returns true if the topic was created or already exists, raises AdminError on other failures.

Parameters:

  • topic (String)
  • partitions (Integer) (defaults to: 1)
  • replication_factor (Integer) (defaults to: 1)
  • config (Hash) (defaults to: {})

    topic-level config (e.g. ‘retention.ms’ => ‘604800000’)

Returns:

  • (Boolean)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/legion/transport/kafka/admin.rb', line 23

def ensure_topic(topic, partitions: 1, replication_factor: 1, config: {})
  admin  = build_admin
  handle = admin.create_topic(topic, partitions, replication_factor, config)
  handle.wait(max_wait_timeout: operation_timeout)
  log_created(topic, partitions)
  true
rescue ::Rdkafka::RdkafkaError => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.kafka.admin.ensure_topic',
                   topic: topic, partitions: partitions)
  # Error code 36 = TOPIC_ALREADY_EXISTS — not a real error for our purposes.
  return true if e.respond_to?(:code) && e.code == :topic_already_exists
  return true if e.message.to_s.include?('TOPIC_ALREADY_EXISTS') || e.message.to_s.include?('Topic already exists')

  raise Legion::Transport::Kafka::AdminError, "ensure_topic(#{topic}) failed: #{e.message}"
rescue StandardError => e
  handle_exception(e, level: :error, handled: false, operation: 'transport.kafka.admin.ensure_topic',
                   topic: topic, partitions: partitions)
  raise Legion::Transport::Kafka::AdminError, "ensure_topic(#{topic}) failed: #{e.message}"
ensure
  safely_close_admin(admin)
end