Class: Rdkafka::Producer::PartitionsCountCache

Inherits:
Object
  • Object
show all
Includes:
Helpers::Time
Defined in:
lib/rdkafka/producer/partitions_count_cache.rb

Overview

Note:

Design considerations:

Caching mechanism for Kafka topic partition counts to avoid frequent cluster queries

This cache is designed to optimize the process of obtaining partition counts for topics. It uses several strategies to minimize Kafka cluster queries:

  1. Statistics-based updates When statistics callbacks are enabled (via ‘statistics.interval.ms`), we leverage this data to proactively update the partition counts cache. This approach costs approximately 0.02ms of processing time during each statistics interval (typically every 5 seconds) but eliminates the need for explicit blocking metadata queries.

  2. Edge case handling If a user configures ‘statistics.interval.ms` much higher than the default cache TTL (Defaults::PARTITIONS_COUNT_CACHE_TTL_MSms), the cache will still function correctly. When statistics updates don’t occur frequently enough, the cache entries will expire naturally, triggering a blocking refresh when needed.

  3. User configuration awareness The cache respects user-defined settings. If ‘topic.metadata.refresh.interval.ms` is set very high, the responsibility for potentially stale data falls on the user. This is an explicit design choice to honor user configuration preferences and align with librdkafka settings.

  4. Process-wide efficiency Since this cache is shared across all Rdkafka producers and consumers within a process, having multiple clients improves overall efficiency. Each client contributes to keeping the cache updated, benefiting all other clients.

  5. Thread-safety approach The implementation uses fine-grained locking with per-topic mutexes to minimize contention in multi-threaded environments while ensuring data consistency.

  6. Topic recreation handling If a topic is deleted and recreated with fewer partitions, the cache will continue to report the higher count until either the TTL expires or the process is restarted. This design choice simplifies the implementation while relying on librdkafka’s error handling for edge cases. In production environments, topic recreation with different partition counts is typically accompanied by application restarts to handle structural changes. This also aligns with the previous cache implementation.

Instance Method Summary collapse

Methods included from Helpers::Time

#monotonic_now, #monotonic_now_ms

Constructor Details

#initialize(ttl = :not_provided, ttl_ms: :not_provided) ⇒ PartitionsCountCache

Creates a new partition count cache

Parameters:

  • ttl (Integer, nil) (defaults to: :not_provided)

    DEPRECATED: Use ttl_ms instead. Time-to-live in seconds for cached values. Will be removed in v1.0.0.

  • ttl_ms (Integer, nil) (defaults to: :not_provided)

    Time-to-live in milliseconds for cached values. Defaults to Defaults::PARTITIONS_COUNT_CACHE_TTL_MS.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 55

def initialize(ttl = :not_provided, ttl_ms: :not_provided)
  @counts = {}
  @mutex_hash = {}
  # Used only for @mutex_hash access to ensure thread-safety when creating new mutexes
  @mutex_for_hash = Mutex.new

  # Determine which TTL value to use
  if ttl != :not_provided && ttl_ms != :not_provided
    warn "DEPRECATION WARNING: Both ttl and ttl_ms were provided to PartitionsCountCache. " \
         "Using ttl_ms. The ttl parameter is deprecated and will be removed in v1.0.0."
    @ttl_ms = ttl_ms
  elsif ttl != :not_provided
    warn "DEPRECATION WARNING: ttl (seconds) parameter for PartitionsCountCache is deprecated. " \
         "Use ttl_ms (milliseconds) instead. This parameter will be removed in v1.0.0."
    @ttl_ms = (ttl * 1000).to_i
  elsif ttl_ms == :not_provided
    @ttl_ms = Defaults::PARTITIONS_COUNT_CACHE_TTL_MS
  else
    @ttl_ms = ttl_ms
  end
end

Instance Method Details

#get(topic) { ... } ⇒ Integer

Note:

The implementation prioritizes read performance over write consistency since partition counts typically only increase during normal operation.

Reads partition count for a topic with automatic refresh when expired

This method will return the cached partition count if available and not expired. If the value is expired or not available, it will execute the provided block to fetch the current value from Kafka.

Parameters:

  • topic (String)

    Kafka topic name

Yields:

  • Block that returns the current partition count when cache needs refreshing

Yield Returns:

  • (Integer)

    Current partition count retrieved from Kafka

Returns:

  • (Integer)

    Partition count for the topic



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 90

def get(topic)
  current_info = @counts[topic]

  if current_info.nil? || expired?(current_info[0])
    new_count = yield

    if current_info.nil?
      # No existing data, create a new entry with mutex
      set(topic, new_count)

      return new_count
    else
      current_count = current_info[1]

      if new_count > current_count
        # Higher value needs mutex to update both timestamp and count
        set(topic, new_count)

        return new_count
      else
        # Same or lower value, just update timestamp without mutex
        refresh_timestamp(topic)

        return current_count
      end
    end
  end

  current_info[1]
end

#set(topic, new_count) ⇒ Object

Note:

We prioritize higher partition counts and only accept them when using a mutex to ensure consistency. This design decision is based on the fact that partition counts in Kafka only increase during normal operation.

Update partition count for a topic when needed

This method updates the partition count for a topic in the cache. It uses a mutex to ensure thread-safety during updates.

Parameters:

  • topic (String)

    Kafka topic name

  • new_count (Integer)

    New partition count value



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 132

def set(topic, new_count)
  # First check outside mutex to avoid unnecessary locking
  current_info = @counts[topic]

  # For lower values, we don't update count but might need to refresh timestamp
  if current_info && new_count < current_info[1]
    refresh_timestamp(topic)

    return
  end

  # Only lock the specific topic mutex
  mutex_for(topic).synchronize do
    # Check again inside the lock as another thread might have updated
    current_info = @counts[topic]

    if current_info.nil?
      # Create new entry
      @counts[topic] = [monotonic_now_ms, new_count]
    else
      current_count = current_info[1]

      if new_count > current_count
        # Update to higher count value
        current_info[0] = monotonic_now_ms
        current_info[1] = new_count
      else
        # Same or lower count, update timestamp only
        current_info[0] = monotonic_now_ms
      end
    end
  end
end

#to_hHash

Returns hash with ttls and partitions counts array.

Returns:

  • (Hash)

    hash with ttls and partitions counts array



167
168
169
# File 'lib/rdkafka/producer/partitions_count_cache.rb', line 167

def to_h
  @counts
end