Class: Deimos::Utils::LagReporter
- Inherits:
-
Object
- Object
- Deimos::Utils::LagReporter
- Extended by:
- Mutex_m
- Defined in:
- lib/deimos/utils/lag_reporter.rb
Overview
Class that manages reporting lag.
Defined Under Namespace
Classes: ConsumerGroup, Topic
Class Method Summary collapse
- .heartbeat(payload) ⇒ Object
-
.message_processed(payload) ⇒ Object
offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition).
- .offset_seek(payload) ⇒ Object
-
.reset ⇒ Object
Reset all group information.
Class Method Details
.heartbeat(payload) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/deimos/utils/lag_reporter.rb', line 132 def heartbeat(payload) group = payload[:group_id] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) consumer_group = @groups[group.to_s] payload[:topic_partitions].each do |topic, partitions| partitions.each do |partition| consumer_group.report_lag(topic, partition) end end end end |
.message_processed(payload) ⇒ Object
offset_lag = event.payload.fetch(:offset_lag) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition)
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/deimos/utils/lag_reporter.rb', line 106 def (payload) offset = payload[:offset] || payload[:last_offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end |
.offset_seek(payload) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/deimos/utils/lag_reporter.rb', line 119 def offset_seek(payload) offset = payload[:offset] topic = payload[:topic] group = payload[:group_id] partition = payload[:partition] synchronize do @groups[group.to_s] ||= ConsumerGroup.new(group) @groups[group.to_s].assign_current_offset(topic, partition, offset) end end |
.reset ⇒ Object
Reset all group information.
97 98 99 |
# File 'lib/deimos/utils/lag_reporter.rb', line 97 def reset @groups = {} end |