Class: Rdkafka::Metadata

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/metadata.rb

Overview

Provides cluster metadata information

Defined Under Namespace

Classes: BrokerMetadata, CustomFFIStruct, Metadata, PartitionMetadata, TopicMetadata

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(native_client, topic_name = nil, timeout_ms = Defaults::METADATA_TIMEOUT_MS) ⇒ Metadata

Fetches metadata from the Kafka cluster

Parameters:

  • native_client (FFI::Pointer)

    pointer to the native Kafka client

  • topic_name (String, nil) (defaults to: nil)

    specific topic to fetch metadata for, or nil for all topics

  • timeout_ms (Integer) (defaults to: Defaults::METADATA_TIMEOUT_MS)

    timeout in milliseconds

Raises:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rdkafka/metadata.rb', line 25

def initialize(native_client, topic_name = nil, timeout_ms = Defaults::METADATA_TIMEOUT_MS)
  attempt ||= 0
  attempt += 1

  native_topic = if topic_name
    Rdkafka::Bindings.rd_kafka_topic_new(native_client, topic_name, nil)
  end

  ptr = FFI::MemoryPointer.new(:pointer)

  # If topic_flag is 1, we request info about *all* topics in the cluster.  If topic_flag is 0,
  # we only request info about locally known topics (or a single topic if one is passed in).
  topic_flag = topic_name.nil? ? 1 : 0

  # Retrieve the Metadata
  result = Rdkafka::Bindings.(native_client, topic_flag, native_topic, ptr, timeout_ms)

  # Error Handling
  raise Rdkafka::RdkafkaError.new(result) unless result.zero?

  (ptr.read_pointer)
rescue ::Rdkafka::RdkafkaError => e
  raise unless RETRIED_ERRORS.include?(e.code)
  raise if attempt > Defaults::METADATA_MAX_RETRIES

  backoff_factor = 2**attempt
  timeout_ms = backoff_factor * Defaults::METADATA_RETRY_BACKOFF_BASE_MS

  sleep(timeout_ms / 1000.0)

  retry
ensure
  Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic) if topic_name
  Rdkafka::Bindings.(ptr.read_pointer)
end

Instance Attribute Details

#brokersArray<Hash> (readonly)

Returns list of broker metadata.

Returns:

  • (Array<Hash>)

    list of broker metadata



7
8
9
# File 'lib/rdkafka/metadata.rb', line 7

def brokers
  @brokers
end

#topicsArray<Hash> (readonly)

Returns list of topic metadata.

Returns:

  • (Array<Hash>)

    list of topic metadata



9
10
11
# File 'lib/rdkafka/metadata.rb', line 9

def topics
  @topics
end