Class: Karafka::Processing::ConsumerGroups::ExecutorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/consumer_groups/executors_buffer.rb

Overview

Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.

Instance Method Summary collapse

Constructor Details

#initialize(client, subscription_group) ⇒ ExecutorsBuffer

Parameters:



18
19
20
21
22
23
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 18

def initialize(client, subscription_group)
  @subscription_group = subscription_group
  @client = client
  # We need two layers here to keep track of topics, partitions and processing groups
  @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } }
end

Instance Method Details

#clearObject

Clears the executors buffer. Useful for critical errors recovery.



88
89
90
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 88

def clear
  @buffer.clear
end

#each {|karafka, partition, given| ... } ⇒ Object

Iterates over all available executors and yields them together with topic and partition info

Yield Parameters:



77
78
79
80
81
82
83
84
85
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 77

def each
  @buffer.each_value do |partitions|
    partitions.each_value do |executors|
      executors.each_value do |executor|
        yield(executor)
      end
    end
  end
end

#find_all(topic, partition) ⇒ Array<Executor, Pro::Processing::ConsumerGroups::Executor>

Finds all the executors available for a given topic partition

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

Returns:



68
69
70
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 68

def find_all(topic, partition)
  @buffer[topic][partition].values
end

#find_all_or_create(topic, partition, coordinator) ⇒ Array<Executor, Pro::Processing::ConsumerGroups::Executor>

Finds all existing executors for given topic partition or creates one for it

Parameters:

Returns:



45
46
47
48
49
50
51
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 45

def find_all_or_create(topic, partition, coordinator)
  existing = find_all(topic, partition)

  return existing unless existing.empty?

  [find_or_create(topic, partition, 0, coordinator)]
end

#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor, Pro::Processing::ConsumerGroups::Executor

Finds or creates an executor based on the provided details

Parameters:

Returns:



32
33
34
35
36
37
38
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 32

def find_or_create(topic, partition, parallel_key, coordinator)
  @buffer[topic][partition][parallel_key] ||= executor_class.new(
    @subscription_group.id,
    @client,
    coordinator
  )
end

#revoke(topic, partition) ⇒ Object

Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number



58
59
60
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 58

def revoke(topic, partition)
  @buffer[topic][partition].clear
end