Class: Karafka::Instrumentation::AssignmentsTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/instrumentation/assignments_tracker.rb

Overview

Keeps track of active assignments and materializes them by returning the routing topics with appropriate partitions that are assigned at a given moment

It is auto-subscribed as part of Karafka itself.

It is not heavy from the computational point of view, as it only operates during rebalances.

We keep assignments as flat topics structure because we can go from topics to both subscription and consumer groups if needed.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeAssignmentsTracker

Initializes the assignments tracker with empty assignments



40
41
42
43
44
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 40

def initialize
  @mutex = Mutex.new
  @assignments = Hash.new { |hash, key| hash[key] = [] }
  @generations = Hash.new { |h, k| h[k] = {} }
end

Class Method Details

.currentHash{Karafka::Routing::Topic => Array<Integer>}

Returns:

See Also:



20
21
22
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 20

def current
  instance.current
end

.generation(topic, partition) ⇒ Integer

Parameters:

Returns:

  • (Integer)

See Also:



34
35
36
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 34

def generation(topic, partition)
  instance.generation(topic, partition)
end

.generationsHash{Karafka::Routing::Topic => Hash{Integer => Integer}}

Returns:

See Also:



26
27
28
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 26

def generations
  instance.generations
end

Instance Method Details

#clearObject

Clears all the assignments and generations



101
102
103
104
105
106
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 101

def clear
  @mutex.synchronize do
    @assignments.clear
    @generations.clear
  end
end

#currentHash{Karafka::Routing::Topic => Array<Integer>}

Note:

Keep in mind, that those assignments can change any time, especially when working with multiple consumer groups or subscription groups.

Note:

We return a copy because we modify internals and we do not want user to tamper with the data accidentally

Returns all the active/current assignments of this given process

Returns:



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 55

def current
  assignments = {}

  # Since the `@assignments` state can change during a rebalance, if we would iterate over
  # it exactly during state change, we would end up with the following error:
  #   RuntimeError: can't add a new key into hash during iteration
  @mutex.synchronize do
    @assignments.each do |topic, partitions|
      assignments[topic] = partitions.dup.freeze
    end
  end

  assignments.freeze
end

#generation(topic, partition) ⇒ Integer

Returns the generation count for a specific topic-partition

Parameters:

Returns:

  • (Integer)

    generation count (0 if never assigned, 1+ otherwise)



94
95
96
97
98
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 94

def generation(topic, partition)
  @mutex.synchronize do
    @generations.dig(topic, partition) || 0
  end
end

#generationsHash{Karafka::Routing::Topic => Hash{Integer => Integer}}

Note:

Returns a frozen deep copy to prevent external mutation

Returns the generation counts for all partitions that have ever been assigned

Returns:

  • (Hash{Karafka::Routing::Topic => Hash{Integer => Integer}})

    topic to partition generation mapping. Generation starts at 1 on first assignment and increments on each reassignment. Revoked partitions remain in the hash with their last generation value.



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 77

def generations
  result = {}

  @mutex.synchronize do
    @generations.each do |topic, partitions|
      result[topic] = partitions.dup.freeze
    end
  end

  result.freeze
end

#inspectString

Returns thread-safe and lock-safe inspect implementation.

Returns:

  • (String)

    thread-safe and lock-safe inspect implementation



109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 109

def inspect
  info = if @mutex.try_lock
    begin
      assignments = @assignments.dup.transform_keys(&:name).inspect
      "assignments=#{assignments}"
    ensure
      @mutex.unlock
    end
  else
    "busy"
  end

  "#<#{self.class.name} #{info}>"
end

#on_client_events_poll(event) ⇒ Object

Note:

We can run the ‘#assignment_lost?` on each events poll because they happen once every 5 seconds during processing plus prior to each messages poll. It takes 0.6 microseconds per call.

Handles events_poll notification to detect assignment loss This is called regularly (every tick_interval) so we check if assignment was lost

Parameters:

  • event (Karafka::Core::Monitoring::Event)


144
145
146
147
148
149
150
151
152
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 144

def on_client_events_poll(event)
  client = event[:caller]

  # Only clear assignments if they were actually lost
  return unless client.assignment_lost?

  # Cleaning happens the same way as with the consumer reset
  on_client_reset(event)
end

#on_client_reset(event) ⇒ Object

When client is under reset due to critical issues, remove all of its assignments as we will

get a new set of assignments

Parameters:

  • event (Karafka::Core::Monitoring::Event)


127
128
129
130
131
132
133
134
135
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 127

def on_client_reset(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    @assignments.delete_if do |topic, _partitions|
      topic.subscription_group.id == sg.id
    end
  end
end

#on_rebalance_partitions_assigned(event) ⇒ Object

# Adds partitions to the current assignments hash

Parameters:

  • event (Karafka::Core::Monitoring::Event)


175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 175

def on_rebalance_partitions_assigned(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    event[:tpl].to_h.each do |topic, partitions|
      topic = sg.topics.find(topic)

      partition_ids = []

      partitions.each do |partition|
        partition_id = partition.partition
        partition_ids << partition_id
        @generations[topic][partition_id] ||= 0
        @generations[topic][partition_id] += 1
      end

      @assignments[topic] += partition_ids
      @assignments[topic].sort!
    end
  end
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Removes partitions from the current assignments hash

Parameters:

  • event (Karafka::Core::Monitoring::Event)


157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/karafka/instrumentation/assignments_tracker.rb', line 157

def on_rebalance_partitions_revoked(event)
  sg = event[:subscription_group]

  @mutex.synchronize do
    event[:tpl].to_h.each do |topic, partitions|
      topic = sg.topics.find(topic)

      @assignments[topic] -= partitions.map(&:partition)
      @assignments[topic].sort!
      # Remove completely topics for which we do not have any assignments left
      @assignments.delete_if { |_topic, cur_partitions| cur_partitions.empty? }
    end
  end
end