Class: Karafka::Routing::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/routing/topic.rb

Overview

Note:

‘#group` is the polymorphic reference to the owning group. Today this is always a ConsumerGroup, but the accessor is named generically in preparation for additional group types (e.g. KIP-932 share groups). `#consumer_group` is kept as an alias for backwards compatibility.

Topic stores all the details on how we should interact with Kafka given topic. It belongs to a group as from 0.6 all the topics can work in the same group It is a part of Karafka’s DSL.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, group) ⇒ Topic

Returns a new instance of Topic.

Parameters:

  • name (String, Symbol)

    name of a topic on which we want to listen

  • group (Karafka::Routing::ConsumerGroup)

    owning group of this topic. Polymorphic placeholder for future group types (e.g. share groups); today always a ConsumerGroup.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/karafka/routing/topic.rb', line 45

def initialize(name, group)
  @name = name.to_s
  @group = group
  @attributes = {}
  @active = true
  # @note We use identifier related to the group that owns a topic, because from Karafka 0.6
  #   we can handle multiple Kafka instances with the same process and we can have same
  #   topic name across multiple groups
  @id = "#{group.id}_#{@name}"
  @consumer = nil
  @active_assigned = false
  @subscription_group_details = nil

  INHERITABLE_ATTRIBUTES.each do |attribute|
    instance_variable_set("@#{attribute}", nil)
  end
end

Instance Attribute Details

#consumerClass

Returns consumer class that we should use.

Returns:

  • (Class)

    consumer class that we should use



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/karafka/routing/topic.rb', line 96

def consumer
  if consumer_persistence
    # When persistence of consumers is on, no need to reload them
    @consumer
  else
    # In order to support code reload without having to change the topic api, we re-fetch the
    # class of a consumer based on its class name. This will support all the cases where the
    # consumer class is defined with a name. It won't support code reload for anonymous
    # consumer classes, but this is an edge case
    begin
      Object.const_get(@consumer.to_s)
    rescue NameError
      # It will only fail if the in case of anonymous classes
      @consumer
    end
  end
end

#groupObject (readonly) Also known as: consumer_group

Returns the value of attribute group.



14
15
16
# File 'lib/karafka/routing/topic.rb', line 14

def group
  @group
end

#idObject (readonly)

Returns the value of attribute id.



14
15
16
# File 'lib/karafka/routing/topic.rb', line 14

def id
  @id
end

#nameObject (readonly)

Returns the value of attribute name.



14
15
16
# File 'lib/karafka/routing/topic.rb', line 14

def name
  @name
end

#subscription_groupObject

Full subscription group reference can be built only when we have knowledge about the whole routing tree, this is why it is going to be set later on



26
27
28
# File 'lib/karafka/routing/topic.rb', line 26

def subscription_group
  @subscription_group
end

#subscription_group_detailsObject

Returns the value of attribute subscription_group_details.



22
23
24
# File 'lib/karafka/routing/topic.rb', line 22

def subscription_group_details
  @subscription_group_details
end

Instance Method Details

#active(active) ⇒ Object

Allows to disable topic by invoking this method and setting it to ‘false`.

Parameters:

  • active (Boolean)

    should this topic be consumed or not



116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/karafka/routing/topic.rb', line 116

def active(active)
  # Do not allow for active overrides. Basically if this is set on the topic level, defaults
  # will not overwrite it and this is desired. Otherwise because of the fact that this is
  # not a full feature config but just a flag, default value would always overwrite the
  # per-topic config since defaults application happens after the topic config block
  unless @active_assigned
    @active = active
    @active_assigned = true
  end

  @active
end

#active?Boolean

Returns should this topic be in use.

Returns:

  • (Boolean)

    should this topic be in use



139
140
141
142
143
144
# File 'lib/karafka/routing/topic.rb', line 139

def active?
  # Never active if disabled via routing
  return false unless @active

  Karafka::App.config.internal.routing.activity_manager.active?(:topics, name)
end

#consumer_classClass

Note:

This is just an alias to the ‘#consumer` method. We however want to use it internally instead of referencing the `#consumer`. We use this to indicate that this method returns class and not an instance. In the routing we want to keep the `#consumer Consumer` routing syntax, but for references outside, we should use this one.

Returns consumer class that we should use.

Returns:

  • (Class)

    consumer class that we should use



134
135
136
# File 'lib/karafka/routing/topic.rb', line 134

def consumer_class
  consumer
end

#kafka=(settings = {}) ⇒ Object

Note:

It is set to ‘false` by default to preserve backwards compatibility

Often users want to have the same basic cluster setup with small setting alterations This method allows us to do so by setting ‘inherit` to `true`. Whe inherit is enabled, settings will be merged with defaults.

Parameters:

  • settings (Hash) (defaults to: {})

    kafka scope settings. If ‘:inherit` key is provided, it will instruct the assignment to merge with root level defaults



84
85
86
87
88
# File 'lib/karafka/routing/topic.rb', line 84

def kafka=(settings = {})
  inherit = settings.delete(:inherit)

  @kafka = inherit ? Karafka::App.config.kafka.merge(settings) : settings
end

#subscription_nameString

Returns name of subscription that will go to librdkafka.

Returns:

  • (String)

    name of subscription that will go to librdkafka



91
92
93
# File 'lib/karafka/routing/topic.rb', line 91

def subscription_name
  name
end

#to_hHash

Note:

This is being used when we validate the group and its topics

Returns hash with all the topic attributes.

Returns:

  • (Hash)

    hash with all the topic attributes



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/karafka/routing/topic.rb', line 148

def to_h
  map = INHERITABLE_ATTRIBUTES.map do |attribute|
    [attribute, public_send(attribute)]
  end

  map.to_h.merge!(
    id: id,
    name: name,
    active: active?,
    consumer: consumer,
    group_id: group.id,
    # Kept as a reference alongside `group_id` for backwards compatibility. Will be removed
    # in Karafka 3.0.
    consumer_group_id: group.id,
    subscription_group_details: subscription_group_details
  ).freeze
end