Class: OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram

Inherits:
Object
  • Object
show all
Defined in:
lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb

Overview

Contains the implementation of the ExponentialBucketHistogram aggregation

Constant Summary collapse

OVERFLOW_ATTRIBUTE_SET =

rubocop:disable Metrics/ClassLength

{ 'otel.metric.overflow' => true }.freeze
DEFAULT_SIZE =
160
DEFAULT_SCALE =
20
MAX_SCALE =
20
MIN_SCALE =
-10
MIN_MAX_SIZE =
2
MAX_MAX_SIZE =
16_384

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), max_size: DEFAULT_SIZE, max_scale: DEFAULT_SCALE, record_min_max: true, zero_threshold: 0, exemplar_reservoir: nil) ⇒ ExponentialBucketHistogram

The default boundaries are calculated based on default max_size and max_scale values



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 37

def initialize(
  aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta),
  max_size: DEFAULT_SIZE,
  max_scale: DEFAULT_SCALE,
  record_min_max: true,
  zero_threshold: 0,
  exemplar_reservoir: nil
)
  @aggregation_temporality = AggregationTemporality.determine_temporality(aggregation_temporality: aggregation_temporality, default: :delta)
  @record_min_max = record_min_max
  @min            = Float::INFINITY
  @max            = -Float::INFINITY
  @sum            = 0
  @count          = 0
  @zero_threshold = zero_threshold
  @zero_count     = 0
  @size           = validate_size(max_size)
  @scale          = validate_scale(max_scale)

  @exemplar_reservoir = exemplar_reservoir || DEFAULT_RESERVOIR
  @exemplar_reservoir_storage = {}

  @mapping = new_mapping(@scale)

  # Previous state for cumulative aggregation
  @previous_positive = {} # nil
  @previous_negative = {} # nil
  @previous_min = {} # Float::INFINITY
  @previous_max = {} # -Float::INFINITY
  @previous_sum = {} # 0
  @previous_count = {} # 0
  @previous_zero_count = {} # 0
  @previous_scale = {} # nil

  # Cache mappings per attribute set
  @mappings = {}
  @previous_mappings = {}
end

Instance Attribute Details

#exemplar_reservoirObject (readonly)

Returns the value of attribute exemplar_reservoir.



30
31
32
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 30

def exemplar_reservoir
  @exemplar_reservoir
end

Instance Method Details

#aggregation_temporalityObject



237
238
239
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 237

def aggregation_temporality
  @aggregation_temporality.temporality
end

#collect(start_time, end_time, data_points) ⇒ Object

when aggregation temporality is cumulative, merge and downscale will happen. rubocop:disable Metrics/MethodLength



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 78

def collect(start_time, end_time, data_points)
  if @aggregation_temporality.delta?
    # Set timestamps and 'move' data point values to result.
    hdps = data_points.values.map! do |hdp|
      hdp.start_time_unix_nano = start_time
      hdp.time_unix_nano = end_time
      reservoir = @exemplar_reservoir_storage[hdp.attributes]
      hdp.exemplars = reservoir&.collect(attributes: hdp.attributes, aggregation_temporality: @aggregation_temporality)
      hdp
    end
    data_points.clear
    @mappings.clear
    hdps
  else
    # CUMULATIVE temporality - merge current data_points to previous data_points
    # and only keep the merged data_points in @previous_*

    merged_data_points = {}

    # this will slow down the operation especially if large amount of data_points present
    # but it should be fine since with cumulative, the data_points are merged into previous_* and not kept in data_points
    data_points.each do |attributes, hdp|
      # Store current values
      current_positive = hdp.positive
      current_negative = hdp.negative
      current_sum = hdp.sum
      current_min = hdp.min
      current_max = hdp.max
      current_count = hdp.count
      current_zero_count = hdp.zero_count
      current_scale = hdp.scale

      # Setup previous positive, negative bucket and scale based on three different cases
      @previous_positive[attributes] = current_positive.copy_empty if @previous_positive[attributes].nil?
      @previous_negative[attributes] = current_negative.copy_empty if @previous_negative[attributes].nil?
      @previous_scale[attributes] = current_scale if @previous_scale[attributes].nil?

      # Determine minimum scale for merging
      min_scale = [@previous_scale[attributes], current_scale].min

      # Calculate ranges for positive and negative buckets
      low_positive, high_positive = get_low_high_previous_current(
        @previous_positive[attributes],
        current_positive,
        @previous_scale[attributes],
        current_scale,
        min_scale
      )
      low_negative, high_negative = get_low_high_previous_current(
        @previous_negative[attributes],
        current_negative,
        @previous_scale[attributes],
        current_scale,
        min_scale
      )

      # Adjust min_scale based on bucket size constraints
      min_scale = [
        min_scale - get_scale_change(low_positive, high_positive),
        min_scale - get_scale_change(low_negative, high_negative)
      ].min

      # Downscale previous buckets if necessary
      downscale_change = @previous_scale[attributes] - min_scale
      downscale(downscale_change, @previous_positive[attributes], @previous_negative[attributes])

      # Merge current buckets into previous buckets (kind like update); it's always :cumulative
      merge_buckets(@previous_positive[attributes], current_positive, current_scale, min_scale, @aggregation_temporality)
      merge_buckets(@previous_negative[attributes], current_negative, current_scale, min_scale, @aggregation_temporality)

      # initialize min, max, sum, count, zero_count for first time
      @previous_min[attributes] = Float::INFINITY if @previous_min[attributes].nil?
      @previous_max[attributes] = -Float::INFINITY if @previous_max[attributes].nil?
      @previous_sum[attributes] = 0 if @previous_sum[attributes].nil?
      @previous_count[attributes] = 0 if @previous_count[attributes].nil?
      @previous_zero_count[attributes] = 0 if @previous_zero_count[attributes].nil?

      # Update aggregated values
      @previous_min[attributes] = [@previous_min[attributes], current_min].min
      @previous_max[attributes] = [@previous_max[attributes], current_max].max
      @previous_sum[attributes] += current_sum
      @previous_count[attributes] += current_count
      @previous_zero_count[attributes] += current_zero_count
      @previous_scale[attributes] = min_scale

      # Create merged data point
      reservoir = @exemplar_reservoir_storage[attributes]
      merged_hdp = ExponentialHistogramDataPoint.new(
        attributes,
        start_time,
        end_time,
        @previous_count[attributes],
        @previous_sum[attributes],
        @previous_scale[attributes],
        @previous_zero_count[attributes],
        @previous_positive[attributes].dup,
        @previous_negative[attributes].dup,
        0, # flags
        reservoir&.collect(attributes: attributes, aggregation_temporality: @aggregation_temporality), # exemplars
        @previous_min[attributes],
        @previous_max[attributes],
        @zero_threshold
      )

      merged_data_points[attributes] = merged_hdp
      @previous_mappings[attributes] = @mappings[attributes] if @mappings[attributes] # Preserve mapping for next collection
    end

    # when you have no local_data_points, the loop from cumulative aggregation will not run
    # so return last merged data points if exists
    if data_points.empty? && !@previous_positive.empty?
      @previous_positive.each_key do |attributes|
        reservoir = @exemplar_reservoir_storage[attributes]
        merged_hdp = ExponentialHistogramDataPoint.new(
          attributes,
          start_time,
          end_time,
          @previous_count[attributes],
          @previous_sum[attributes],
          @previous_scale[attributes],
          @previous_zero_count[attributes],
          @previous_positive[attributes].dup,
          @previous_negative[attributes].dup,
          0, # flags
          reservoir&.collect(attributes: attributes, aggregation_temporality: @aggregation_temporality), # exemplars
          @previous_min[attributes],
          @previous_max[attributes],
          @zero_threshold
        )
        merged_data_points[attributes] = merged_hdp
      end
    end

    # Swap current with previous mappings for next cycle
    @mappings = @previous_mappings
    @previous_mappings = {}

    # clear data_points since the data is merged into previous_* already;
    # otherwise we will have duplicated data_points in the next collect
    data_points.clear
    merged_data_points.values # return array
  end
end

#update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false) ⇒ Object

this is aggregate in python; there is no merge in aggregate; but rescale happened



224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb', line 224

def update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false)
  hdp = if data_points.key?(attributes)
          data_points[attributes]
        elsif data_points.size >= cardinality_limit - 1
          data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
        else
          create_new_data_point(attributes, data_points)
        end

  update_histogram_data_point(hdp, attributes, amount, exemplar_offer: exemplar_offer)
  nil
end