Class: Karafka::Routing::SubscriptionGroupsBuilder
- Inherits:
-
Object
- Object
- Karafka::Routing::SubscriptionGroupsBuilder
- Defined in:
- lib/karafka/routing/subscription_groups_builder.rb
Overview
rdkafka allows us to group topics subscriptions when they have same settings. This builder groups topics from a single consumer group into subscription groups that can be subscribed with one rdkafka connection. This way we save resources as having several rdkafka consumers under the hood is not the cheapest thing in a bigger system.
In general, if we can, we try to subscribe to as many topics with one rdkafka connection as possible, but if not possible, we divide.
Instance Method Summary collapse
-
#call(topics, base_position: nil) ⇒ Array<SubscriptionGroup>
All subscription groups we need in separate threads.
-
#initialize ⇒ SubscriptionGroupsBuilder
constructor
Initializes the subscription groups builder.
Constructor Details
#initialize ⇒ SubscriptionGroupsBuilder
Initializes the subscription groups builder
28 29 30 |
# File 'lib/karafka/routing/subscription_groups_builder.rb', line 28 def initialize @position = -1 end |
Instance Method Details
#call(topics, base_position: nil) ⇒ Array<SubscriptionGroup>
Returns all subscription groups we need in separate threads.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/karafka/routing/subscription_groups_builder.rb', line 39 def call(topics, base_position: nil) # If base_position is provided, use it for stable rebuilding (consumer group reopening). # Otherwise continue from global counter for new subscription groups. # We subtract 1 from base_position because position is incremented in the map block below. # This ensures the first subscription group gets the correct base_position value. use_base = !base_position.nil? position = use_base ? base_position - 1 : @position topics .map { |topic| [checksum(topic), topic] } .group_by(&:first) .values .map { |value| value.map(&:last) } .flat_map { |value| (value) } .map { |grouped_topics| SubscriptionGroup.new(position += 1, grouped_topics) } .tap do |subscription_groups| # Always ensure global counter is at least as high as the highest position used. # This prevents position collisions when new consumer groups are created after # existing ones are rebuilt with base_position. @position = position if position > @position subscription_groups.each do |subscription_group| subscription_group.topics.each do |topic| topic.subscription_group = subscription_group end end end end |