Class: Racecar::RebalanceListener

Inherits:
Object
  • Object
show all
Defined in:
lib/racecar/rebalance_listener.rb

Defined Under Namespace

Classes: Event

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, instrumenter, partition_processors) ⇒ RebalanceListener

Returns a new instance of RebalanceListener.



3
4
5
6
7
8
9
# File 'lib/racecar/rebalance_listener.rb', line 3

def initialize(config, instrumenter, partition_processors)
  @consumer_class = config.consumer_class
  @config = config
  @instrumenter = instrumenter
  @partition_processors = partition_processors
  @rdkafka_consumer = nil
end

Instance Attribute Details

#rdkafka_consumer=(value) ⇒ Object

Sets the attribute rdkafka_consumer

Parameters:

  • value

    the value to set the attribute rdkafka_consumer to.



11
12
13
# File 'lib/racecar/rebalance_listener.rb', line 11

def rdkafka_consumer=(value)
  @rdkafka_consumer = value
end

Instance Method Details

#on_partitions_assigned(rdkafka_topic_partition_list) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/racecar/rebalance_listener.rb', line 16

def on_partitions_assigned(rdkafka_topic_partition_list)
  event = Event.new(rdkafka_consumer: rdkafka_consumer, rdkafka_topic_partition_list: rdkafka_topic_partition_list)

  instrument("partitions_assigned", partitions: event.partition_numbers) do
    consumer_class.on_partitions_assigned(event)
  end
end

#on_partitions_revoked(rdkafka_topic_partition_list) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/racecar/rebalance_listener.rb', line 24

def on_partitions_revoked(rdkafka_topic_partition_list)
  event = Event.new(rdkafka_consumer: rdkafka_consumer, rdkafka_topic_partition_list: rdkafka_topic_partition_list)

  instrument("partitions_revoked", partitions: event.partition_numbers) do
    consumer_class.on_partitions_revoked(event)
    rdkafka_topic_partition_list.to_h.each do |topic, |
      .flatten.map(&:partition).each do |partition|
        key = Runner.topic_partition_key(topic, partition)
        processor = @partition_processors[key]
        processor&.rebalance!
        @partition_processors.delete(key)
      end
    end
  end
end