Class: Karafka::Connection::PausesManager
- Inherits:
 - 
      Object
      
        
- Object
 - Karafka::Connection::PausesManager
 
 
- Defined in:
 - lib/karafka/connection/pauses_manager.rb
 
Overview
Partitions pauses management abstraction layer. It aggregates all the pauses for all the partitions that we’re working with.
Instance Method Summary collapse
- 
  
    
      #fetch(topic, partition)  ⇒ Karafka::TimeTrackers::Pause 
    
    
  
  
  
  
  
  
  
  
  
    
Creates or fetches pause tracker of a given topic partition.
 - 
  
    
      #initialize  ⇒ Karafka::Connection::PausesManager 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
Pauses manager.
 - 
  
    
      #resume {|topic, partition| ... } ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Resumes processing of partitions for which pause time has ended.
 
Constructor Details
#initialize ⇒ Karafka::Connection::PausesManager
Returns pauses manager.
      9 10 11 12 13  | 
    
      # File 'lib/karafka/connection/pauses_manager.rb', line 9 def initialize @pauses = Hash.new do |h, k| h[k] = {} end end  | 
  
Instance Method Details
#fetch(topic, partition) ⇒ Karafka::TimeTrackers::Pause
Creates or fetches pause tracker of a given topic partition.
      20 21 22 23 24 25 26  | 
    
      # File 'lib/karafka/connection/pauses_manager.rb', line 20 def fetch(topic, partition) @pauses[topic][partition] ||= TimeTrackers::Pause.new( timeout: topic.pause_timeout, max_timeout: topic.pause_max_timeout, exponential_backoff: topic.pause_with_exponential_backoff ) end  | 
  
#resume {|topic, partition| ... } ⇒ Object
Resumes processing of partitions for which pause time has ended.
      32 33 34 35 36 37 38 39 40 41 42 43  | 
    
      # File 'lib/karafka/connection/pauses_manager.rb', line 32 def resume @pauses.each do |topic, partitions| partitions.each do |partition, pause| next unless pause.paused? next unless pause.expired? pause.resume yield(topic, partition) end end end  |