Class: Karafka::Pro::Processing::ConsumerGroups::OffsetMetadata::Fetcher
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::ConsumerGroups::OffsetMetadata::Fetcher
- Extended by:
- Forwardable
- Includes:
- Singleton
- Defined in:
- lib/karafka/pro/processing/consumer_groups/offset_metadata/fetcher.rb
Overview
This fetcher is responsible for fetching and caching committed offsets metadata information.
By design we fetch all information for a requested topic assignments. Not all topics from the same subscription group may need metadata and even if, we can run the few smaller queries. This approach prevents us from querying all assigned topics data in one go preventing excessive queries.
Since the assumption is, that user will not have to reach out for the later metadata since it is produced in the context of a given consumer assignment, we can cache the initial result and only allow users for explicit invalidation.
Instance Method Summary collapse
-
#clear(subscription_group) ⇒ Object
Clears cache of a given subscription group.
-
#find(topic, partition, cache: true) ⇒ Object, false
Queries or retrieves from cache the given offset metadata for the selected partition.
-
#initialize ⇒ Fetcher
constructor
Initializes the fetcher with empty caches.
-
#register(client) ⇒ Object
Registers a client of a given subscription group, so we can use it for queries later on.
Constructor Details
#initialize ⇒ Fetcher
Initializes the fetcher with empty caches
61 62 63 64 65 |
# File 'lib/karafka/pro/processing/consumer_groups/offset_metadata/fetcher.rb', line 61 def initialize @mutexes = {} @clients = {} @tpls = {} end |
Instance Method Details
#clear(subscription_group) ⇒ Object
Clears cache of a given subscription group. It is triggered on assignment changes.
109 110 111 112 113 |
# File 'lib/karafka/pro/processing/consumer_groups/offset_metadata/fetcher.rb', line 109 def clear(subscription_group) @mutexes.fetch(subscription_group).synchronize do @tpls[subscription_group].clear end end |
#find(topic, partition, cache: true) ⇒ Object, false
Queries or retrieves from cache the given offset metadata for the selected partition
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/karafka/pro/processing/consumer_groups/offset_metadata/fetcher.rb', line 88 def find(topic, partition, cache: true) cache = topic..cache? && cache tpls = fetch(topic, cache) return false unless tpls t_partitions = tpls.fetch(topic.name, []) t_partition = t_partitions.find { |t_p| t_p.partition == partition } # If we do not have given topic partition here, it means it is no longer part of our # assignment and we should return false return false unless t_partition topic..deserializer.call(t_partition.) end |
#register(client) ⇒ Object
Since we store the client reference and not the underlying rdkafka consumer instance, we do not have to deal with the recovery as it is abstracted away
Registers a client of a given subscription group, so we can use it for queries later on
71 72 73 74 75 76 |
# File 'lib/karafka/pro/processing/consumer_groups/offset_metadata/fetcher.rb', line 71 def register(client) @clients[client.subscription_group] = client # We use one mutex per SG because independent SGs can query in parallel @mutexes[client.subscription_group] = Mutex.new @tpls[client.subscription_group] = {} end |