Class: Karafka::Routing::SubscriptionGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/routing/subscription_group.rb

Overview

Note:

One subscription group will always belong to one consumer group, but one consumer group can have multiple subscription groups.

Object representing a set of single consumer group topics that can be subscribed together with one connection.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(position, topics) ⇒ SubscriptionGroup

Returns built subscription group.

Parameters:

  • position (Integer)

    position of this subscription group in all the subscriptions groups array. We need to have this value for sake of static group memberships, where we need a “in-between” restarts unique identifier

  • topics (Karafka::Routing::Topics)

    all the topics that share the same key settings



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/karafka/routing/subscription_group.rb', line 50

def initialize(position, topics)
  @details = topics.first.subscription_group_details
  @name = @details.fetch(:name)
  @group = topics.first.group
  # We include the owning group id here because we want to have unique ids of subscription
  # groups across the system. Otherwise user could set the same name for multiple
  # subscription groups in many groups effectively having same id for different entities
  @id = "#{@group.id}_#{@name}_#{position}"
  @position = position
  @topics = topics
  @kafka = build_kafka
end

Instance Attribute Details

#groupObject (readonly) Also known as: consumer_group

Returns the value of attribute group.



17
18
19
# File 'lib/karafka/routing/subscription_group.rb', line 17

def group
  @group
end

#idObject (readonly)

Returns the value of attribute id.



17
18
19
# File 'lib/karafka/routing/subscription_group.rb', line 17

def id
  @id
end

#kafkaObject (readonly)

Returns the value of attribute kafka.



17
18
19
# File 'lib/karafka/routing/subscription_group.rb', line 17

def kafka
  @kafka
end

#nameObject (readonly)

Returns the value of attribute name.



17
18
19
# File 'lib/karafka/routing/subscription_group.rb', line 17

def name
  @name
end

#positionObject (readonly)

Returns the value of attribute position.



17
18
19
# File 'lib/karafka/routing/subscription_group.rb', line 17

def position
  @position
end

#topicsObject (readonly)

Returns the value of attribute topics.



17
18
19
# File 'lib/karafka/routing/subscription_group.rb', line 17

def topics
  @topics
end

Class Method Details

.idString

Generates new subscription group id that will be used in case of anonymous subscription

groups

Returns:

  • (String)

    hex(6) compatible reproducible id



33
34
35
36
37
38
39
40
41
42
# File 'lib/karafka/routing/subscription_group.rb', line 33

def id
  ID_MUTEX.synchronize do
    @group_counter ||= 0
    @group_counter += 1

    Digest::SHA256.hexdigest(
      @group_counter.to_s
    )[0..11]
  end
end

Instance Method Details

#active?Boolean

Returns is this subscription group one of active once.

Returns:

  • (Boolean)

    is this subscription group one of active once



84
85
86
# File 'lib/karafka/routing/subscription_group.rb', line 84

def active?
  activity_manager.active?(:subscription_groups, name)
end

#assignments(_consumer) ⇒ false, Rdkafka::Consumer::TopicPartitionList

Returns List of tpls for direct assignments or false for the normal mode.

Parameters:

Returns:

  • (false, Rdkafka::Consumer::TopicPartitionList)

    List of tpls for direct assignments or false for the normal mode



101
102
103
# File 'lib/karafka/routing/subscription_group.rb', line 101

def assignments(_consumer)
  false
end

#group_idString Also known as: consumer_group_id

Returns group id (the Kafka ‘group.id` value assigned to this subscription group’s connection).

Returns:

  • (String)

    group id (the Kafka ‘group.id` value assigned to this subscription group’s connection)



65
66
67
# File 'lib/karafka/routing/subscription_group.rb', line 65

def group_id
  kafka[:"group.id"]
end

#max_messagesInteger

Returns max messages fetched in a single go.

Returns:

  • (Integer)

    max messages fetched in a single go



74
75
76
# File 'lib/karafka/routing/subscription_group.rb', line 74

def max_messages
  @topics.first.max_messages
end

#max_wait_timeInteger

Returns max milliseconds we can wait for incoming messages.

Returns:

  • (Integer)

    max milliseconds we can wait for incoming messages



79
80
81
# File 'lib/karafka/routing/subscription_group.rb', line 79

def max_wait_time
  @topics.first.max_wait_time
end

#refreshObject

Refreshes the configuration of this subscription group if needed based on the execution context.

Since the initial routing setup happens in the supervisor, it is inherited by the children. This causes incomplete assignment of ‘group.instance.id` which is not expanded with proper node identifier. This refreshes this if needed when in swarm.



117
118
119
120
121
122
# File 'lib/karafka/routing/subscription_group.rb', line 117

def refresh
  return unless node
  return unless kafka.key?(:"group.instance.id")

  @kafka = build_kafka
end

#subscriptionsfalse, Array<String>

Note:

Most of the time it should not include inactive topics but in case of pattern matching the matcher topics become inactive down the road, hence we filter out so they are later removed.

Returns names of topics to which we should subscribe or false when operating only on direct assignments.

Returns:

  • (false, Array<String>)

    names of topics to which we should subscribe or false when operating only on direct assignments



94
95
96
# File 'lib/karafka/routing/subscription_group.rb', line 94

def subscriptions
  topics.select(&:active?).map(&:subscription_name)
end

#to_sString

Note:

This is an alias for displaying in places where we print the stringified version.

Returns id of the subscription group.

Returns:

  • (String)

    id of the subscription group



107
108
109
# File 'lib/karafka/routing/subscription_group.rb', line 107

def to_s
  id
end