Class: Karafka::Pro::Instrumentation::PerformanceTracker

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/karafka/pro/instrumentation/performance_tracker.rb

Overview

Note:

Even if we have some race-conditions here it is relevant due to the quantity of data. This is why we do not mutex it.

Tracker used to keep track of performance metrics It provides insights that can be used to optimize processing flow

Instance Method Summary collapse

Constructor Details

#initializePerformanceTracker

Builds up nested concurrent hash for data tracking



48
49
50
51
52
53
54
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 48

def initialize
  @processing_times = Hash.new do |topics_hash, topic|
    topics_hash[topic] = Hash.new do |partitions_hash, partition|
      partitions_hash[partition] = []
    end
  end
end

Instance Method Details

#on_consumer_consumed(event) ⇒ Object

Tracks time taken to process a single message of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details



71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 71

def on_consumer_consumed(event)
  consumer = event[:caller]
  messages = consumer.messages
  topic = messages..topic
  partition = messages..partition

  samples = @processing_times[topic][partition]
  samples << (event[:time] / messages.size)

  return unless samples.size > SAMPLES_COUNT

  samples.shift
end

#processing_time_p95(topic, partition) ⇒ Float

Returns p95 processing time of a single message from a single topic partition.

Parameters:

  • topic (String)
  • partition (Integer)

Returns:

  • (Float)

    p95 processing time of a single message from a single topic partition



59
60
61
62
63
64
65
66
# File 'lib/karafka/pro/instrumentation/performance_tracker.rb', line 59

def processing_time_p95(topic, partition)
  values = @processing_times[topic][partition]

  return 0 if values.empty?
  return values.first if values.size == 1

  percentile(0.95, values)
end