Class: Karafka::Pro::Instrumentation::PerformanceTracker
- Inherits:
-
Object
- Object
- Karafka::Pro::Instrumentation::PerformanceTracker
- Includes:
- Singleton
- Defined in:
- lib/karafka/pro/instrumentation/performance_tracker.rb
Overview
Note:
Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.
Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow
Instance Method Summary collapse
-
#initialize ⇒ PerformanceTracker
constructor
Builds up nested concurrent hash for data tracking.
-
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition.
-
#processing_time_p95(topic, partition) ⇒ Float
P95 processing time of a single message from a single topic partition.
Constructor Details
#initialize ⇒ PerformanceTracker
Builds up nested concurrent hash for data tracking
48 49 50 51 52 53 54 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 48 def initialize @processing_times = Hash.new do |topics_hash, topic| topics_hash[topic] = Hash.new do |partitions_hash, partition| partitions_hash[partition] = [] end end end |
Instance Method Details
#on_consumer_consumed(event) ⇒ Object
Tracks time taken to process a single message of a given topic partition
71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 71 def on_consumer_consumed(event) consumer = event[:caller] = consumer. topic = ..topic partition = ..partition samples = @processing_times[topic][partition] samples << (event[:time] / .size) return unless samples.size > SAMPLES_COUNT samples.shift end |
#processing_time_p95(topic, partition) ⇒ Float
Returns p95 processing time of a single message from a single topic partition.
59 60 61 62 63 64 65 66 |
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 59 def processing_time_p95(topic, partition) values = @processing_times[topic][partition] return 0 if values.empty? return values.first if values.size == 1 percentile(0.95, values) end |