Class: Karafka::Instrumentation::Callbacks::ConsumerGroups::Statistics

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/callbacks/consumer_groups/statistics.rb

Overview

Statistics callback handler

See Also:

  • for details on why we decorate those statistics

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group_id, group_id, client_name) ⇒ Statistics

Returns a new instance of Statistics.

Parameters:

  • subscription_group_id (String)
  • group_id (String)

    id of the owning group (consumer group today)

  • client_name (String)

    rdkafka client name



18
19
20
21
22
23
# File 'lib/karafka/instrumentation/callbacks/consumer_groups/statistics.rb', line 18

def initialize(subscription_group_id, group_id, client_name)
  @subscription_group_id = subscription_group_id
  @group_id = group_id
  @client_name = client_name
  @statistics_decorator = Karafka::Core::Monitoring::StatisticsDecorator.new
end

Instance Method Details

#call(statistics) ⇒ Object

Emits decorated statistics to the monitor

Parameters:

  • statistics (Hash)

    rdkafka statistics



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/karafka/instrumentation/callbacks/consumer_groups/statistics.rb', line 27

def call(statistics)
  # Emit only statistics related to our client
  # rdkafka does not have per-instance statistics hook, thus we need to make sure that we
  # emit only stats that are related to current producer. Otherwise we would emit all of
  # all the time.
  return unless @client_name == statistics["name"]

  monitor.instrument(
    "statistics.emitted",
    subscription_group_id: @subscription_group_id,
    consumer_group_id: @group_id,
    group_id: @group_id,
    statistics: @statistics_decorator.call(statistics)
  )
# We need to catch and handle any potential errors coming from the instrumentation pipeline
# as otherwise, in case of statistics which run in the main librdkafka thread, any crash
# will hang the whole process.
rescue => e
  monitor.instrument(
    "error.occurred",
    caller: self,
    subscription_group_id: @subscription_group_id,
    consumer_group_id: @group_id,
    group_id: @group_id,
    type: "callbacks.statistics.error",
    error: e
  )
end