Class: Karafka::Pro::Processing::Filters::Delayer
- Defined in:
 - lib/karafka/pro/processing/filters/delayer.rb
 
Overview
A filter that allows us to delay processing by pausing until time is right.
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- 
  
    
      #action  ⇒ Symbol 
    
    
  
  
  
  
  
  
  
  
  
    
Action to take on post-filtering.
 - 
  
    
      #apply!(messages)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    
Removes too young messages.
 - 
  
    
      #initialize(delay)  ⇒ Delayer 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
A new instance of Delayer.
 - 
  
    
      #timeout  ⇒ Integer 
    
    
  
  
  
  
  
  
  
  
  
    
Timeout delay in ms.
 
Methods inherited from Base
#applied?, #mark_as_consumed?, #marking_method
Constructor Details
#initialize(delay) ⇒ Delayer
Returns a new instance of Delayer.
      21 22 23 24 25  | 
    
      # File 'lib/karafka/pro/processing/filters/delayer.rb', line 21 def initialize(delay) super() @delay = delay end  | 
  
Instance Method Details
#action ⇒ Symbol
Returns action to take on post-filtering.
      61 62 63 64 65  | 
    
      # File 'lib/karafka/pro/processing/filters/delayer.rb', line 61 def action return :skip unless applied? timeout <= 0 ? :seek : :pause end  | 
  
#apply!(messages) ⇒ Object
Removes too young messages
      30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49  | 
    
      # File 'lib/karafka/pro/processing/filters/delayer.rb', line 30 def apply!() @applied = false @cursor = nil # Time on message is in seconds with ms precision, so we need to convert the ttl that # is in ms to this format border = ::Time.now.utc - @delay / 1_000.0 .delete_if do || too_young = . > border if too_young @applied = true @cursor ||= end @applied end end  | 
  
#timeout ⇒ Integer
Returns timeout delay in ms.
      52 53 54 55 56 57 58  | 
    
      # File 'lib/karafka/pro/processing/filters/delayer.rb', line 52 def timeout return 0 unless @cursor timeout = (@delay / 1_000.0) - (::Time.now.utc - @cursor.) timeout <= 0 ? 0 : timeout * 1_000 end  |