Class: Deimos::Utils::LagReporter
- Inherits:
- 
      Object
      
        - Object
- Deimos::Utils::LagReporter
 
- Extended by:
- Mutex_m
- Defined in:
- lib/deimos/utils/lag_reporter.rb
Overview
Class that manages reporting lag.
Defined Under Namespace
Classes: ConsumerGroup, Topic
Class Method Summary collapse
- .heartbeat(payload) ⇒ void
- 
  
    
      .message_processed(payload)  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition). 
- .offset_seek(payload) ⇒ void
- 
  
    
      .reset  ⇒ void 
    
    
  
  
  
  
  
  
  
  
  
    Reset all group information. 
Class Method Details
.heartbeat(payload) ⇒ void
This method returns an undefined value.
| 143 144 145 146 147 148 149 150 151 152 153 154 | # File 'lib/deimos/utils/lag_reporter.rb', line 143 def heartbeat(payload) group = payload[:group_id] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) consumer_group = @groups[group.to_s] payload[:topic_partitions].each do |topic, partitions| partitions.each do |partition| consumer_group.report_lag(topic, partition) end end end end | 
.message_processed(payload) ⇒ void
This method returns an undefined value.
offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition)
| 115 116 117 118 119 120 121 122 123 124 125 | # File 'lib/deimos/utils/lag_reporter.rb', line 115 def (payload) offset = payload[:offset] || payload[:last_offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end | 
.offset_seek(payload) ⇒ void
This method returns an undefined value.
| 129 130 131 132 133 134 135 136 137 138 139 | # File 'lib/deimos/utils/lag_reporter.rb', line 129 def offset_seek(payload) offset = payload[:offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end | 
.reset ⇒ void
This method returns an undefined value.
Reset all group information.
| 105 106 107 | # File 'lib/deimos/utils/lag_reporter.rb', line 105 def reset @groups = {} end |