Class: Deimos::Utils::LagReporter::Topic
- Inherits:
-
Object
- Object
- Deimos::Utils::LagReporter::Topic
- Defined in:
- lib/deimos/utils/lag_reporter.rb
Overview
Topic which has a hash of partition => last known current offsets
Instance Attribute Summary collapse
- #consumer_group ⇒ ConsumerGroup
- #partition_current_offsets ⇒ Hash<Integer, Integer>
- #topic_name ⇒ String
Instance Method Summary collapse
- #assign_current_offset(partition, offset) ⇒ Object
- #compute_lag(partition, offset) ⇒ Object
-
#initialize(topic_name, group) ⇒ Topic
constructor
A new instance of Topic.
- #report_lag(partition) ⇒ Object
Constructor Details
#initialize(topic_name, group) ⇒ Topic
Returns a new instance of Topic.
52 53 54 55 56 |
# File 'lib/deimos/utils/lag_reporter.rb', line 52 def initialize(topic_name, group) self.topic_name = topic_name self.consumer_group = group self.partition_current_offsets = {} end |
Instance Attribute Details
#consumer_group ⇒ ConsumerGroup
48 49 50 |
# File 'lib/deimos/utils/lag_reporter.rb', line 48 def consumer_group @consumer_group end |
#partition_current_offsets ⇒ Hash<Integer, Integer>
46 47 48 |
# File 'lib/deimos/utils/lag_reporter.rb', line 46 def partition_current_offsets @partition_current_offsets end |
#topic_name ⇒ String
44 45 46 |
# File 'lib/deimos/utils/lag_reporter.rb', line 44 def topic_name @topic_name end |
Instance Method Details
#assign_current_offset(partition, offset) ⇒ Object
59 60 61 |
# File 'lib/deimos/utils/lag_reporter.rb', line 59 def assign_current_offset(partition, offset) self.partition_current_offsets[partition.to_i] = offset end |
#compute_lag(partition, offset) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/deimos/utils/lag_reporter.rb', line 64 def compute_lag(partition, offset) begin client = Phobos.create_kafka_client last_offset = client.last_offset_for(self.topic_name, partition) lag = last_offset - offset rescue StandardError # don't do anything, just wait Deimos.config.logger. debug("Error computing lag for #{self.topic_name}, will retry") end lag || 0 end |
#report_lag(partition) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/deimos/utils/lag_reporter.rb', line 77 def report_lag(partition) current_offset = self.partition_current_offsets[partition.to_i] return unless current_offset lag = compute_lag(partition, current_offset) group = self.consumer_group.id Deimos.config.logger. debug("Sending lag: #{group}/#{partition}: #{lag}") Deimos.config.metrics&.gauge('consumer_lag', lag, tags: %W( consumer_group:#{group} partition:#{partition} topic:#{self.topic_name} )) end |