Class: Karafka::Core::Monitoring::StatisticsDecorator

Inherits:
Object
  • Object
show all
Includes:
Helpers::Time
Defined in:
lib/karafka/core/monitoring/statistics_decorator.rb

Overview

Many of the librdkafka statistics are absolute values instead of a gauge. This means, that for example number of messages sent is an absolute growing value instead of being a value of messages sent from the last statistics report. This decorator calculates the diff against previously emitted stats, so we get also the diff together with the original values

It adds two extra values to numerics:

- KEY_d - delta of the previous value and current
- KEY_fd - freeze duration - describes how long the delta remains unchanged (zero)
         and can be useful for detecting values that "hang" for extended period of time
         and do not have any change (delta always zero). This value is in ms for the
         consistency with other time operators we use. A newly introduced key (one
         that had no value in the previous emission) starts at a freeze duration of
         zero, since there is no prior value it could have been "frozen" against.

The _d and _fd suffixes are reserved. For every numeric KEY the decorator writes KEY_d and KEY_fd into the same hash, so if the input already contains a real key literally named KEY_d or KEY_fd (for another numeric KEY present in that hash) it is overwritten by the computed delta/freeze duration. librdkafka statistics never use these suffixes, so this does not happen with real stats; it only matters if you feed custom data through the decorator.

Instance Method Summary collapse

Methods included from Helpers::Time

#float_now, #monotonic_now

Constructor Details

#initialize(excluded_keys: [], only_keys: []) ⇒ StatisticsDecorator

Returns a new instance of StatisticsDecorator.

Parameters:

  • excluded_keys (Array<String>) (defaults to: [])

    list of key names to skip entirely during decoration. Excluded keys are not recursed into and not decorated with delta/freeze duration suffixes. This is useful for skipping large subtrees of the librdkafka statistics that are not consumed by the application (e.g. broker toppars, window stats like int_latency, outbuf_latency, throttle, batchsize, batchcnt, req).

  • only_keys (Array<String>) (defaults to: [])

    when set, only these numeric keys are decorated with delta/freeze duration suffixes, and only at the levels of the known librdkafka statistics tree: the root, each broker, each topic, each partition and cgrp. The decorator navigates that known structure directly and decorates the listed keys it finds at each of those levels. It deliberately does NOT descend into nested sub-objects within a broker, topic, partition or cgrp (e.g. broker window stats like rtt/throttle/int_latency, or the toppars map) -- skipping that descent is exactly what makes this mode cheap on large clusters. Non-librdkafka hash children found at the root are still fully recursed for correctness. If you need a key nested inside one of those sub-objects decorated, use the default full-decoration mode. When empty (default), all numeric keys at every depth are decorated.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/karafka/core/monitoring/statistics_decorator.rb', line 51

def initialize(excluded_keys: [], only_keys: [])
  @previous = EMPTY_HASH
  # Operate on ms precision only
  @previous_at = monotonic_now.round
  # Cache for memoized suffix keys to avoid repeated string allocations
  @suffix_keys_cache = {}
  # Frozen hash for O(1) key exclusion lookup, nil when empty to avoid per-key
  # lookups in the hot loop when no exclusions are configured
  @excluded_keys = unless excluded_keys.empty?
    excluded_keys.each_with_object({}) { |k, h| h[k] = true }.freeze
  end
  # Frozen array for direct-access decoration, nil when empty to use full decoration.
  # Exclusions are applied once here (excluded_keys wins over only_keys), so the hot
  # decoration loop iterates an already-filtered list and never re-checks exclusions.
  @only_keys = unless only_keys.empty?
    effective = @excluded_keys ? only_keys.reject { |k| @excluded_keys.key?(k) } : only_keys
    effective.freeze
  end
end

Instance Method Details

#call(emitted_stats) ⇒ Hash

Note:

We modify the emitted statistics, instead of creating new. Since we don't expose any API to get raw data, users can just assume that the result of this decoration is the proper raw stats that they can use

Returns emitted statistics extended with the diff data.

Parameters:

  • emitted_stats (Hash)

    original emitted statistics

Returns:

  • (Hash)

    emitted statistics extended with the diff data



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/karafka/core/monitoring/statistics_decorator.rb', line 76

def call(emitted_stats)
  current_at = monotonic_now.round
  change_d = current_at - @previous_at

  diff(
    @previous,
    emitted_stats,
    change_d
  )

  @previous = emitted_stats
  @previous_at = current_at

  emitted_stats.freeze
end