Class: Karafka::Routing::ConsumerGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/routing/consumer_group.rb

Overview

Note:

A single consumer group represents Kafka consumer group, but it may not match 1:1 with subscription groups. There can be more subscription groups than consumer groups

Object used to describe a single consumer group that is going to subscribe to given topics It is a part of Karafka’s DSL

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ ConsumerGroup

Returns a new instance of ConsumerGroup.

Parameters:

  • name (String, Symbol)

    name of this consumer group.



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/karafka/routing/consumer_group.rb', line 26

def initialize(name)
  @name = name.to_s
  # This used to be different when consumer mappers existed but now it is the same
  @id = @name
  @topics = Topics.new([])
  # Initialize the subscription group so there's always a value for it, since even if not
  # defined directly, a subscription group will be created
  @current_subscription_group_details = { name: SubscriptionGroup.id }
  # Track the base position for subscription groups to ensure stable positions when
  # rebuilding. This is critical for static group membership in swarm mode
  @subscription_groups_base_position = nil
end

Instance Attribute Details

#current_subscription_group_detailsObject

This is a “virtual” attribute that is not building subscription groups. It allows us to store the “current” subscription group defined in the routing This subscription group id is then injected into topics, so we can compute the subscription groups



23
24
25
# File 'lib/karafka/routing/consumer_group.rb', line 23

def current_subscription_group_details
  @current_subscription_group_details
end

#idObject (readonly)

Returns the value of attribute id.



17
18
19
# File 'lib/karafka/routing/consumer_group.rb', line 17

def id
  @id
end

#nameObject (readonly)

Returns the value of attribute name.



17
18
19
# File 'lib/karafka/routing/consumer_group.rb', line 17

def name
  @name
end

#topicsObject (readonly)

Returns the value of attribute topics.



17
18
19
# File 'lib/karafka/routing/consumer_group.rb', line 17

def topics
  @topics
end

Instance Method Details

#active?Boolean

Returns true if this consumer group should be active in our current process.

Returns:

  • (Boolean)

    true if this consumer group should be active in our current process



40
41
42
# File 'lib/karafka/routing/consumer_group.rb', line 40

def active?
  activity_manager.active?(:consumer_groups, name)
end

#subscription_group=(name = SubscriptionGroup.id) ⇒ Object

Assigns the current subscription group id based on the defined one and allows for further topic definition

Parameters:

  • name (String, Symbol) (defaults to: SubscriptionGroup.id)

    name of the current subscription group



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/karafka/routing/consumer_group.rb', line 68

def subscription_group=(name = SubscriptionGroup.id, &)
  # We cast it here, so the routing supports symbol based but that's anyhow later on
  # validated as a string
  @current_subscription_group_details = { name: name.to_s }

  Proxy.new(self, &)

  # We need to reset the current subscription group after it is used, so it won't leak
  # outside to other topics that would be defined without a defined subscription group
  @current_subscription_group_details = { name: SubscriptionGroup.id }
end

#subscription_groupsArray<Routing::SubscriptionGroup>

Returns all the subscription groups build based on the consumer group topics.

Returns:



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/karafka/routing/consumer_group.rb', line 82

def subscription_groups
  @subscription_groups ||= begin
    result = subscription_groups_builder.call(
      topics,
      base_position: @subscription_groups_base_position
    )

    # Store the base position from the first subscription group for future rebuilds.
    # This ensures stable positions for static group membership.
    @subscription_groups_base_position ||= result.first&.position

    result
  end
end

#to_hHash

Hashed version of consumer group that can be used for validation purposes topics inside of it.

Returns:

  • (Hash)

    hash with consumer group attributes including serialized to hash



100
101
102
103
104
105
# File 'lib/karafka/routing/consumer_group.rb', line 100

def to_h
  {
    topics: topics.map(&:to_h),
    id: id
  }.freeze
end

#topic=(name) ⇒ Karafka::Routing::Topic

Builds a topic representation inside of a current consumer group route

Parameters:

  • name (String, Symbol)

    name of topic to which we want to subscribe

Returns:



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/karafka/routing/consumer_group.rb', line 47

def topic=(name, &)
  # Clear memoized subscription groups since adding a topic requires rebuilding them
  # This is critical for consumer group reopening across multiple draw calls
  @subscription_groups = nil

  topic = Topic.new(name, self)
  @topics << Proxy.new(
    topic,
    builder.defaults,
    &
  ).target
  built_topic = @topics.last
  # We overwrite it conditionally in case it was not set by the user inline in the topic
  # block definition
  built_topic.subscription_group_details ||= current_subscription_group_details
  built_topic
end