Class: Karafka::Pro::Processing::OffsetMetadata::Fetcher

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Singleton
Defined in:
lib/karafka/pro/processing/offset_metadata/fetcher.rb

Overview

This fetcher is responsible for fetching and caching committed offsets metadata information.

By design we fetch all information for a requested topic assignments. Not all topics from the same subscription group may need metadata and even if, we can run the few smaller queries. This approach prevents us from querying all assigned topics data in one go preventing excessive queries.

Since the assumption is, that user will not have to reach out for the later metadata since it is produced in the context of a given consumer assignment, we can cache the initial result and only allow users for explicit invalidation.

Instance Method Summary collapse

Constructor Details

#initializeFetcher

Initializes the fetcher with empty caches



57
58
59
60
61
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 57

def initialize
  @mutexes = {}
  @clients = {}
  @tpls = {}
end

Instance Method Details

#clear(subscription_group) ⇒ Object

Clears cache of a given subscription group. It is triggered on assignment changes.

Parameters:



105
106
107
108
109
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 105

def clear(subscription_group)
  @mutexes.fetch(subscription_group).synchronize do
    @tpls[subscription_group].clear
  end
end

#find(topic, partition, cache: true) ⇒ Object, false

Queries or retrieves from cache the given offset metadata for the selected partition

Parameters:

  • topic (Karafka::Routing::Topic)

    routing topic with subscription group reference

  • partition (Integer)

    partition for which we want to get stored offset metadata

  • cache (Boolean) (defaults to: true)

    forces explicit query to Kafka when false and cache refresh. By default we use the setting from the topic level but this can be overwritten on a per request basis if needed.

Returns:

  • (Object, false)

    deserialized metadata (string deserializer by default) or false in case we were not able to obtain the details because we have lost the assignment



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 84

def find(topic, partition, cache: true)
  cache = topic..cache? && cache

  tpls = fetch(topic, cache)

  return false unless tpls

  t_partitions = tpls.fetch(topic.name, [])
  t_partition = t_partitions.find { |t_p| t_p.partition == partition }

  # If we do not have given topic partition here, it means it is no longer part of our
  # assignment and we should return false
  return false unless t_partition

  topic..deserializer.call(t_partition.)
end

#register(client) ⇒ Object

Note:

Since we store the client reference and not the underlying rdkafka consumer instance, we do not have to deal with the recovery as it is abstracted away

Registers a client of a given subscription group, so we can use it for queries later on

Parameters:



67
68
69
70
71
72
# File 'lib/karafka/pro/processing/offset_metadata/fetcher.rb', line 67

def register(client)
  @clients[client.subscription_group] = client
  # We use one mutex per SG because independent SGs can query in parallel
  @mutexes[client.subscription_group] = Mutex.new
  @tpls[client.subscription_group] = {}
end