Class: Karafka::Processing::ConsumerGroups::ExecutorsBuffer
- Inherits:
-
Object
- Object
- Karafka::Processing::ConsumerGroups::ExecutorsBuffer
- Defined in:
- lib/karafka/processing/consumer_groups/executors_buffer.rb
Overview
Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the executors buffer.
-
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info.
-
#find_all(topic, partition) ⇒ Array<Executor, Pro::Processing::ConsumerGroups::Executor>
Finds all the executors available for a given topic partition.
-
#find_all_or_create(topic, partition, coordinator) ⇒ Array<Executor, Pro::Processing::ConsumerGroups::Executor>
Finds all existing executors for given topic partition or creates one for it.
-
#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor, Pro::Processing::ConsumerGroups::Executor
Finds or creates an executor based on the provided details.
- #initialize(client, subscription_group) ⇒ ExecutorsBuffer constructor
-
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages.
Constructor Details
#initialize(client, subscription_group) ⇒ ExecutorsBuffer
18 19 20 21 22 23 |
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 18 def initialize(client, subscription_group) @subscription_group = subscription_group @client = client # We need two layers here to keep track of topics, partitions and processing groups @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } } end |
Instance Method Details
#clear ⇒ Object
Clears the executors buffer. Useful for critical errors recovery.
88 89 90 |
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 88 def clear @buffer.clear end |
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info
77 78 79 80 81 82 83 84 85 |
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 77 def each @buffer.each_value do |partitions| partitions.each_value do |executors| executors.each_value do |executor| yield(executor) end end end end |
#find_all(topic, partition) ⇒ Array<Executor, Pro::Processing::ConsumerGroups::Executor>
Finds all the executors available for a given topic partition
68 69 70 |
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 68 def find_all(topic, partition) @buffer[topic][partition].values end |
#find_all_or_create(topic, partition, coordinator) ⇒ Array<Executor, Pro::Processing::ConsumerGroups::Executor>
Finds all existing executors for given topic partition or creates one for it
45 46 47 48 49 50 51 |
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 45 def find_all_or_create(topic, partition, coordinator) existing = find_all(topic, partition) return existing unless existing.empty? [find_or_create(topic, partition, 0, coordinator)] end |
#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor, Pro::Processing::ConsumerGroups::Executor
Finds or creates an executor based on the provided details
32 33 34 35 36 37 38 |
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 32 def find_or_create(topic, partition, parallel_key, coordinator) @buffer[topic][partition][parallel_key] ||= executor_class.new( @subscription_group.id, @client, coordinator ) end |
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages
58 59 60 |
# File 'lib/karafka/processing/consumer_groups/executors_buffer.rb', line 58 def revoke(topic, partition) @buffer[topic][partition].clear end |