Class: Karafka::Instrumentation::AssignmentsTracker
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::AssignmentsTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/instrumentation/assignments_tracker.rb
Overview
Keeps track of active assignments and materializes them by returning the routing topics with appropriate partitions that are assigned at a given moment
It is auto-subscribed as part of Karafka itself.
It is not heavy from the computational point of view, as it only operates during rebalances.
We keep assignments as flat topics structure because we can go from topics to both subscription and consumer groups if needed.
Class Method Summary collapse
- .current ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
- .generation(topic, partition) ⇒ Integer
- .generations ⇒ Hash{Karafka::Routing::Topic => Hash{Integer => Integer}}
Instance Method Summary collapse
-
#clear ⇒ Object
Clears all the assignments and generations.
-
#current ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
Returns all the active/current assignments of this given process.
-
#generation(topic, partition) ⇒ Integer
Returns the generation count for a specific topic-partition.
-
#generations ⇒ Hash{Karafka::Routing::Topic => Hash{Integer => Integer}}
Returns the generation counts for all partitions that have ever been assigned.
-
#initialize ⇒ AssignmentsTracker
constructor
Initializes the assignments tracker with empty assignments.
-
#inspect ⇒ String
Thread-safe and lock-safe inspect implementation.
-
#on_client_events_poll(event) ⇒ Object
Handles events_poll notification to detect assignment loss This is called regularly (every tick_interval) so we check if assignment was lost.
-
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will get a new set of assignments.
-
#on_rebalance_partitions_assigned(event) ⇒ Object
# Adds partitions to the current assignments hash.
-
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash.
Constructor Details
#initialize ⇒ AssignmentsTracker
Initializes the assignments tracker with empty assignments
40 41 42 43 44 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 40 def initialize @mutex = Mutex.new @assignments = Hash.new { |hash, key| hash[key] = [] } @generations = Hash.new { |h, k| h[k] = {} } end |
Class Method Details
.current ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
20 21 22 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 20 def current instance.current end |
.generation(topic, partition) ⇒ Integer
34 35 36 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 34 def generation(topic, partition) instance.generation(topic, partition) end |
.generations ⇒ Hash{Karafka::Routing::Topic => Hash{Integer => Integer}}
26 27 28 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 26 def generations instance.generations end |
Instance Method Details
#clear ⇒ Object
Clears all the assignments and generations
101 102 103 104 105 106 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 101 def clear @mutex.synchronize do @assignments.clear @generations.clear end end |
#current ⇒ Hash{Karafka::Routing::Topic => Array<Integer>}
Keep in mind, that those assignments can change any time, especially when working with multiple consumer groups or subscription groups.
We return a copy because we modify internals and we do not want user to tamper with the data accidentally
Returns all the active/current assignments of this given process
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 55 def current assignments = {} # Since the `@assignments` state can change during a rebalance, if we would iterate over # it exactly during state change, we would end up with the following error: # RuntimeError: can't add a new key into hash during iteration @mutex.synchronize do @assignments.each do |topic, partitions| assignments[topic] = partitions.dup.freeze end end assignments.freeze end |
#generation(topic, partition) ⇒ Integer
Returns the generation count for a specific topic-partition
94 95 96 97 98 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 94 def generation(topic, partition) @mutex.synchronize do @generations.dig(topic, partition) || 0 end end |
#generations ⇒ Hash{Karafka::Routing::Topic => Hash{Integer => Integer}}
Returns a frozen deep copy to prevent external mutation
Returns the generation counts for all partitions that have ever been assigned
77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 77 def generations result = {} @mutex.synchronize do @generations.each do |topic, partitions| result[topic] = partitions.dup.freeze end end result.freeze end |
#inspect ⇒ String
Returns thread-safe and lock-safe inspect implementation.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 109 def inspect info = if @mutex.try_lock begin assignments = @assignments.dup.transform_keys(&:name).inspect "assignments=#{assignments}" ensure @mutex.unlock end else "busy" end "#<#{self.class.name} #{info}>" end |
#on_client_events_poll(event) ⇒ Object
We can run the ‘#assignment_lost?` on each events poll because they happen once every 5 seconds during processing plus prior to each messages poll. It takes 0.6 microseconds per call.
Handles events_poll notification to detect assignment loss This is called regularly (every tick_interval) so we check if assignment was lost
144 145 146 147 148 149 150 151 152 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 144 def on_client_events_poll(event) client = event[:caller] # Only clear assignments if they were actually lost return unless client.assignment_lost? # Cleaning happens the same way as with the consumer reset on_client_reset(event) end |
#on_client_reset(event) ⇒ Object
When client is under reset due to critical issues, remove all of its assignments as we will
get a new set of assignments
127 128 129 130 131 132 133 134 135 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 127 def on_client_reset(event) sg = event[:subscription_group] @mutex.synchronize do @assignments.delete_if do |topic, _partitions| topic.subscription_group.id == sg.id end end end |
#on_rebalance_partitions_assigned(event) ⇒ Object
# Adds partitions to the current assignments hash
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 175 def on_rebalance_partitions_assigned(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) partition_ids = [] partitions.each do |partition| partition_id = partition.partition partition_ids << partition_id @generations[topic][partition_id] ||= 0 @generations[topic][partition_id] += 1 end @assignments[topic] += partition_ids @assignments[topic].sort! end end end |
#on_rebalance_partitions_revoked(event) ⇒ Object
Removes partitions from the current assignments hash
157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 157 def on_rebalance_partitions_revoked(event) sg = event[:subscription_group] @mutex.synchronize do event[:tpl].to_h.each do |topic, partitions| topic = sg.topics.find(topic) @assignments[topic] -= partitions.map(&:partition) @assignments[topic].sort! # Remove completely topics for which we do not have any assignments left @assignments.delete_if { |_topic, cur_partitions| cur_partitions.empty? } end end end |