Class: Karafka::Processing::ConsumerGroups::CoordinatorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/consumer_groups/coordinators_buffer.rb

Overview

Note:

This buffer operates only from the listener loop, thus we do not have to make it thread-safe.

Coordinators builder used to build coordinators per topic partition

It provides direct pauses access for revocation

Instance Method Summary collapse

Constructor Details

#initialize(topics) ⇒ CoordinatorsBuffer

Returns a new instance of CoordinatorsBuffer.

Parameters:



20
21
22
23
24
# File 'lib/karafka/processing/consumer_groups/coordinators_buffer.rb', line 20

def initialize(topics)
  @pauses_manager = Connection::PausesManager.new
  @coordinators = Hash.new { |h, k| h[k] = {} }
  @topics = topics
end

Instance Method Details

#find_or_create(topic_name, partition) ⇒ Karafka::Processing::ConsumerGroups::Coordinator

Returns found or created coordinator.

Parameters:

  • topic_name (String)

    topic name

  • partition (Integer)

    partition number

Returns:



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/karafka/processing/consumer_groups/coordinators_buffer.rb', line 29

def find_or_create(topic_name, partition)
  @coordinators[topic_name][partition] ||= begin
    routing_topic = @topics.find(topic_name)

    coordinator_class.new(
      routing_topic,
      partition,
      @pauses_manager.fetch(routing_topic, partition)
    )
  end
end

#resetObject

Clears coordinators and re-created the pauses manager This should be used only for critical errors recovery



62
63
64
65
# File 'lib/karafka/processing/consumer_groups/coordinators_buffer.rb', line 62

def reset
  @pauses_manager = Connection::PausesManager.new
  @coordinators.clear
end

#resume {|topic, partition| ... } ⇒ Object

Resumes processing of partitions for which pause time has ended.

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number



44
45
46
# File 'lib/karafka/processing/consumer_groups/coordinators_buffer.rb', line 44

def resume(&)
  @pauses_manager.resume(&)
end

#revoke(topic_name, partition) ⇒ Object

Parameters:

  • topic_name (String)

    topic name

  • partition (Integer)

    partition number



50
51
52
53
54
55
56
57
58
# File 'lib/karafka/processing/consumer_groups/coordinators_buffer.rb', line 50

def revoke(topic_name, partition)
  return unless @coordinators[topic_name].key?(partition)

  # The fact that we delete here does not change the fact that the executor still holds the
  # reference to this coordinator. We delete it here, as we will no longer process any
  # new stuff with it and we may need a new coordinator if we regain this partition, but the
  # coordinator may still be in use
  @coordinators[topic_name].delete(partition).revoke
end