Class: Karafka::Routing::Topic
- Inherits:
-
Object
- Object
- Karafka::Routing::Topic
- Defined in:
- lib/karafka/routing/topic.rb
Overview
‘#group` is the polymorphic reference to the owning group. Today this is always a ConsumerGroup, but the accessor is named generically in preparation for additional group types (e.g. KIP-932 share groups). `#consumer_group` is kept as an alias for backwards compatibility.
Topic stores all the details on how we should interact with Kafka given topic. It belongs to a group as from 0.6 all the topics can work in the same group It is a part of Karafka’s DSL.
Instance Attribute Summary collapse
-
#consumer ⇒ Class
Consumer class that we should use.
-
#group ⇒ Object
(also: #consumer_group)
readonly
Returns the value of attribute group.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#subscription_group ⇒ Object
Full subscription group reference can be built only when we have knowledge about the whole routing tree, this is why it is going to be set later on.
-
#subscription_group_details ⇒ Object
Returns the value of attribute subscription_group_details.
Instance Method Summary collapse
-
#active(active) ⇒ Object
Allows to disable topic by invoking this method and setting it to ‘false`.
-
#active? ⇒ Boolean
Should this topic be in use.
-
#consumer_class ⇒ Class
Consumer class that we should use.
-
#initialize(name, group) ⇒ Topic
constructor
A new instance of Topic.
-
#kafka=(settings = {}) ⇒ Object
Often users want to have the same basic cluster setup with small setting alterations This method allows us to do so by setting ‘inherit` to `true`.
-
#subscription_name ⇒ String
Name of subscription that will go to librdkafka.
-
#to_h ⇒ Hash
Hash with all the topic attributes.
Constructor Details
#initialize(name, group) ⇒ Topic
Returns a new instance of Topic.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/karafka/routing/topic.rb', line 45 def initialize(name, group) @name = name.to_s @group = group @attributes = {} @active = true # @note We use identifier related to the group that owns a topic, because from Karafka 0.6 # we can handle multiple Kafka instances with the same process and we can have same # topic name across multiple groups @id = "#{group.id}_#{@name}" @consumer = nil @active_assigned = false @subscription_group_details = nil INHERITABLE_ATTRIBUTES.each do |attribute| instance_variable_set("@#{attribute}", nil) end end |
Instance Attribute Details
#consumer ⇒ Class
Returns consumer class that we should use.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/karafka/routing/topic.rb', line 96 def consumer if consumer_persistence # When persistence of consumers is on, no need to reload them @consumer else # In order to support code reload without having to change the topic api, we re-fetch the # class of a consumer based on its class name. This will support all the cases where the # consumer class is defined with a name. It won't support code reload for anonymous # consumer classes, but this is an edge case begin Object.const_get(@consumer.to_s) rescue NameError # It will only fail if the in case of anonymous classes @consumer end end end |
#group ⇒ Object (readonly) Also known as: consumer_group
Returns the value of attribute group.
14 15 16 |
# File 'lib/karafka/routing/topic.rb', line 14 def group @group end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
14 15 16 |
# File 'lib/karafka/routing/topic.rb', line 14 def id @id end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
14 15 16 |
# File 'lib/karafka/routing/topic.rb', line 14 def name @name end |
#subscription_group ⇒ Object
Full subscription group reference can be built only when we have knowledge about the whole routing tree, this is why it is going to be set later on
26 27 28 |
# File 'lib/karafka/routing/topic.rb', line 26 def subscription_group @subscription_group end |
#subscription_group_details ⇒ Object
Returns the value of attribute subscription_group_details.
22 23 24 |
# File 'lib/karafka/routing/topic.rb', line 22 def subscription_group_details @subscription_group_details end |
Instance Method Details
#active(active) ⇒ Object
Allows to disable topic by invoking this method and setting it to ‘false`.
116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/karafka/routing/topic.rb', line 116 def active(active) # Do not allow for active overrides. Basically if this is set on the topic level, defaults # will not overwrite it and this is desired. Otherwise because of the fact that this is # not a full feature config but just a flag, default value would always overwrite the # per-topic config since defaults application happens after the topic config block unless @active_assigned @active = active @active_assigned = true end @active end |
#active? ⇒ Boolean
Returns should this topic be in use.
139 140 141 142 143 144 |
# File 'lib/karafka/routing/topic.rb', line 139 def active? # Never active if disabled via routing return false unless @active Karafka::App.config.internal.routing.activity_manager.active?(:topics, name) end |
#consumer_class ⇒ Class
This is just an alias to the ‘#consumer` method. We however want to use it internally instead of referencing the `#consumer`. We use this to indicate that this method returns class and not an instance. In the routing we want to keep the `#consumer Consumer` routing syntax, but for references outside, we should use this one.
Returns consumer class that we should use.
134 135 136 |
# File 'lib/karafka/routing/topic.rb', line 134 def consumer_class consumer end |
#kafka=(settings = {}) ⇒ Object
It is set to ‘false` by default to preserve backwards compatibility
Often users want to have the same basic cluster setup with small setting alterations This method allows us to do so by setting ‘inherit` to `true`. Whe inherit is enabled, settings will be merged with defaults.
84 85 86 87 88 |
# File 'lib/karafka/routing/topic.rb', line 84 def kafka=(settings = {}) inherit = settings.delete(:inherit) @kafka = inherit ? Karafka::App.config.kafka.merge(settings) : settings end |
#subscription_name ⇒ String
Returns name of subscription that will go to librdkafka.
91 92 93 |
# File 'lib/karafka/routing/topic.rb', line 91 def subscription_name name end |
#to_h ⇒ Hash
This is being used when we validate the group and its topics
Returns hash with all the topic attributes.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/karafka/routing/topic.rb', line 148 def to_h map = INHERITABLE_ATTRIBUTES.map do |attribute| [attribute, public_send(attribute)] end map.to_h.merge!( id: id, name: name, active: active?, consumer: consumer, group_id: group.id, # Kept as a reference alongside `group_id` for backwards compatibility. Will be removed # in Karafka 3.0. consumer_group_id: group.id, subscription_group_details: subscription_group_details ).freeze end |