Class: Karafka::Web::Tracking::Consumers::Sampler

Inherits:
Sampler
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/web/tracking/consumers/sampler.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/os.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/base.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/jobs.rb,
lib/karafka/web/tracking/consumers/sampler/enrichers/base.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/server.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/network.rb,
lib/karafka/web/tracking/consumers/sampler/metrics/container.rb,
lib/karafka/web/tracking/consumers/sampler/enrichers/consumer_groups.rb

Overview

Samples for fetching and storing metrics samples about the consumer process

Defined Under Namespace

Modules: Enrichers, Metrics

Constant Summary collapse

SCHEMA_VERSION =

Current schema version This is used for detecting incompatible changes and not using outdated data during upgrades

"1.7.0"
DEFAULT_POLL_INTERVAL_MS =

Default max.poll.interval.ms value in milliseconds (5 minutes) This is the librdkafka default when not explicitly configured

300_000

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Sampler

#karafka_core_version, #karafka_version, #karafka_web_version, #librdkafka_version, #process_id, #rdkafka_version, #ruby_version, #waterdrop_version

Constructor Details

#initializeSampler

Returns a new instance of Sampler.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 42

def initialize
  super

  @windows = Helpers::Ttls::Windows.new
  @counters = COUNTERS_BASE.dup

  @consumer_groups = Hash.new do |h, cg_id|
    h[cg_id] = {
      id: cg_id,
      subscription_groups: {}
    }
  end

  @subscription_groups = Hash.new do |h, sg_id|
    h[sg_id] = {
      id: sg_id,
      polled_at: monotonic_now,
      poll_interval: DEFAULT_POLL_INTERVAL_MS,
      topics: Hash.new do |h1, topic|
        h1[topic] = Hash.new do |h2, partition|
          # We track those details in case we need to fill statistical gaps for
          # transactional consumers
          h2[partition] = {
            seek_offset: -1,
            transactional: false
          }
        end
      end
    }
  end

  @errors = []
  @pauses = {}
  @jobs = {}
  @shell = MemoizedShell.new
  @memory_total_usage = 0
  @memory_usage = 0
  @cpu_usage = [-1, -1, -1]

  # Select and instantiate appropriate system metrics collector based on environment
  # Use container-aware collector if cgroups are available, otherwise use OS-based
  metrics_class = if Metrics::Container.active?
    Metrics::Container
  else
    Metrics::Os
  end
  @system_metrics = metrics_class.new(@shell)
  @network_metrics = Metrics::Network.new(@windows)
  @server_metrics = Metrics::Server.new
end

Instance Attribute Details

#consumer_groupsObject (readonly)

Returns the value of attribute consumer_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def consumer_groups
  @consumer_groups
end

#countersObject (readonly)

Returns the value of attribute counters.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def counters
  @counters
end

#errorsObject (readonly)

Returns the value of attribute errors.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def errors
  @errors
end

#jobsObject (readonly)

Returns the value of attribute jobs.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def jobs
  @jobs
end

#pausesObject (readonly)

Returns the value of attribute pauses.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def pauses
  @pauses
end

#subscription_groupsObject (readonly)

Returns the value of attribute subscription_groups.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def subscription_groups
  @subscription_groups
end

#windowsObject (readonly)

Returns the value of attribute windows.



12
13
14
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 12

def windows
  @windows
end

Instance Method Details

#clearObject

Note:

We do not clear processing or pauses or other things like this because we track their states and not values, so they need to be tracked between flushes.

Clears counters and errors. Used after data is reported by reported to start collecting new samples



149
150
151
152
153
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 149

def clear
  @counters.each { |k, _| @counters[k] = 0 }

  @errors.clear
end

#sampleObject

Note:

This should run before any mutex, so other threads can continue as those operations may invoke shell commands



157
158
159
160
161
162
163
164
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 157

def sample
  memory_threads_ps

  @memory_usage = memory_usage
  @memory_total_usage = memory_total_usage
  @cpu_usage = cpu_usage
  @threads = threads
end

#to_reportHash

Returns report hash with all the details about consumer operations.

Returns:

  • (Hash)

    report hash with all the details about consumer operations



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 102

def to_report
  {
    schema_version: SCHEMA_VERSION,
    type: "consumer",
    dispatched_at: float_now,

    process: {
      id: process_id,
      started_at: started_at,
      status: ::Karafka::App.config.internal.status.to_s,
      execution_mode: ::Karafka::Server.execution_mode.to_s,
      listeners: @server_metrics.listeners,
      workers: workers,
      memory_usage: @memory_usage,
      memory_total_usage: @memory_total_usage,
      memory_size: memory_size,
      cpus: cpus,
      threads: threads,
      cpu_usage: @cpu_usage,
      tags: Karafka::Process.tags,
      bytes_received: @network_metrics.bytes_received,
      bytes_sent: @network_metrics.bytes_sent
    },

    versions: {
      ruby: ruby_version,
      karafka: karafka_version,
      karafka_core: karafka_core_version,
      karafka_web: karafka_web_version,
      waterdrop: waterdrop_version,
      rdkafka: rdkafka_version,
      librdkafka: librdkafka_version
    },

    stats: jobs_metrics.jobs_queue_statistics.merge(
      utilization: jobs_metrics.utilization
    ).merge(total: @counters),

    consumer_groups: enriched_consumer_groups,
    jobs: jobs.values
  }
end

#trackObject

We cannot report and track the same time, that is why we use mutex here. To make sure that samples aggregations and counting does not interact with reporter flushing.



95
96
97
98
99
# File 'lib/karafka/web/tracking/consumers/sampler.rb', line 95

def track
  Reporter::MUTEX.synchronize do
    yield(self)
  end
end