Class: Karafka::Processing::ExecutorsBuffer
- Inherits:
 - 
      Object
      
        
- Object
 - Karafka::Processing::ExecutorsBuffer
 
 
- Defined in:
 - lib/karafka/processing/executors_buffer.rb
 
Overview
Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.
Instance Method Summary collapse
- 
  
    
      #clear  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Clears the executors buffer.
 - 
  
    
      #each {|karafka, partition, given| ... } ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Iterates over all available executors and yields them together with topic and partition info.
 - 
  
    
      #find_all(topic, partition)  ⇒ Array<Executor, Pro::Processing::Executor> 
    
    
  
  
  
  
  
  
  
  
  
    
Finds all the executors available for a given topic partition.
 - 
  
    
      #find_all_or_create(topic, partition, coordinator)  ⇒ Array<Executor, Pro::Processing::Executor> 
    
    
  
  
  
  
  
  
  
  
  
    
Finds all existing executors for given topic partition or creates one for it.
 - 
  
    
      #find_or_create(topic, partition, parallel_key, coordinator)  ⇒ Executor, Pro::Processing::Executor 
    
    
  
  
  
  
  
  
  
  
  
    
Finds or creates an executor based on the provided details.
 - #initialize(client, subscription_group) ⇒ ExecutorsBuffer constructor
 - 
  
    
      #revoke(topic, partition)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages.
 
Constructor Details
#initialize(client, subscription_group) ⇒ ExecutorsBuffer
      15 16 17 18 19 20  | 
    
      # File 'lib/karafka/processing/executors_buffer.rb', line 15 def initialize(client, subscription_group) @subscription_group = subscription_group @client = client # We need two layers here to keep track of topics, partitions and processing groups @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } } end  | 
  
Instance Method Details
#clear ⇒ Object
Clears the executors buffer. Useful for critical errors recovery.
      85 86 87  | 
    
      # File 'lib/karafka/processing/executors_buffer.rb', line 85 def clear @buffer.clear end  | 
  
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info
      74 75 76 77 78 79 80 81 82  | 
    
      # File 'lib/karafka/processing/executors_buffer.rb', line 74 def each @buffer.each_value do |partitions| partitions.each_value do |executors| executors.each_value do |executor| yield(executor) end end end end  | 
  
#find_all(topic, partition) ⇒ Array<Executor, Pro::Processing::Executor>
Finds all the executors available for a given topic partition
      65 66 67  | 
    
      # File 'lib/karafka/processing/executors_buffer.rb', line 65 def find_all(topic, partition) @buffer[topic][partition].values end  | 
  
#find_all_or_create(topic, partition, coordinator) ⇒ Array<Executor, Pro::Processing::Executor>
Finds all existing executors for given topic partition or creates one for it
      42 43 44 45 46 47 48  | 
    
      # File 'lib/karafka/processing/executors_buffer.rb', line 42 def find_all_or_create(topic, partition, coordinator) existing = find_all(topic, partition) return existing unless existing.empty? [find_or_create(topic, partition, 0, coordinator)] end  | 
  
#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor, Pro::Processing::Executor
Finds or creates an executor based on the provided details
      29 30 31 32 33 34 35  | 
    
      # File 'lib/karafka/processing/executors_buffer.rb', line 29 def find_or_create(topic, partition, parallel_key, coordinator) @buffer[topic][partition][parallel_key] ||= executor_class.new( @subscription_group.id, @client, coordinator ) end  | 
  
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages
      55 56 57  | 
    
      # File 'lib/karafka/processing/executors_buffer.rb', line 55 def revoke(topic, partition) @buffer[topic][partition].clear end  |