Class: StatsD::Instrument::Aggregator
- Inherits:
-
Object
- Object
- StatsD::Instrument::Aggregator
- Defined in:
- lib/statsd/instrument/aggregator.rb
Defined Under Namespace
Classes: Finalizer
Constant Summary collapse
- DEFAULT_MAX_CONTEXT_SIZE =
250
Class Method Summary collapse
Instance Method Summary collapse
-
#aggregate_precompiled_gauge_metric(precompiled_datagram, value) ⇒ void
Aggregates a precompiled metric that can be combined into a single scalar for later flushing.
-
#aggregate_precompiled_increment_metric(precompiled_datagram, value = 1) ⇒ void
Aggregates a precompiled metric that can be combined into a single scalar for later flushing.
-
#aggregate_precompiled_timing_metric(precompiled_datagram, value = 1) ⇒ void
Aggregates a precompiled metric that can be packed into a single datagram for later flushing.
- #aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION, sample_rate: CONST_SAMPLE_RATE) ⇒ Object
- #flush ⇒ Object
- #gauge(name, value, tags: [], no_prefix: false) ⇒ Object
-
#increment(name, value = 1, tags: [], no_prefix: false, sample_rate: CONST_SAMPLE_RATE) ⇒ void
Increment a counter by a given value and save it for later flushing.
-
#initialize(sink, datagram_builder_class, prefix, default_tags, flush_interval: 5.0, max_values: DEFAULT_MAX_CONTEXT_SIZE) ⇒ Aggregator
constructor
A new instance of Aggregator.
Constructor Details
#initialize(sink, datagram_builder_class, prefix, default_tags, flush_interval: 5.0, max_values: DEFAULT_MAX_CONTEXT_SIZE) ⇒ Aggregator
Returns a new instance of Aggregator.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/statsd/instrument/aggregator.rb', line 101 def initialize( sink, datagram_builder_class, prefix, , flush_interval: 5.0, max_values: DEFAULT_MAX_CONTEXT_SIZE ) @sink = sink @datagram_builder_class = datagram_builder_class @metric_prefix = prefix @default_tags = @datagram_builders = { true: nil, false: nil, } @max_values = max_values # Mutex protects the aggregation_state and flush_thread from concurrent access @aggregation_state_mutex = Mutex.new # Mutex protects do_flush to prevent flushing to the sink concurrently @flush_mutex = Mutex.new @aggregation_state = {} @pid = Process.pid @flush_interval = flush_interval start_flush_thread @finalizer = Finalizer.new( @aggregation_state, @sink, @datagram_builders, @datagram_builder_class, @default_tags, @metric_prefix, ) ObjectSpace.define_finalizer( self, self.class.finalize(@finalizer), ) end |
Class Method Details
.finalize(finalizer) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/statsd/instrument/aggregator.rb', line 43 def finalize(finalizer) # The finalizer can be called in trap context, which means we cannot use mutexes here. proc do aggregation_state = finalizer.aggregation_state sink = finalizer.sink datagram_builders = finalizer.datagram_builders datagram_builder_class = finalizer.datagram_builder_class = finalizer. metric_prefix = finalizer.metric_prefix aggregation_state.each do |key, agg_value| if key.is_a?(StatsD::Instrument::CompiledMetric::PrecompiledDatagram) sink << key.to_datagram(agg_value) next end no_prefix = key.no_prefix datagram_builders[no_prefix] ||= datagram_builder_class.new( prefix: no_prefix ? nil : metric_prefix, default_tags: , ) case key.type when COUNT sink << datagram_builders[no_prefix].c( key.name, agg_value, key.sample_rate, key., ) when DISTRIBUTION, MEASURE, HISTOGRAM sink << datagram_builders[no_prefix].timing_value_packed( key.name, key.type.to_s, agg_value, key.sample_rate, key., ) when GAUGE sink << datagram_builders[no_prefix].g( key.name, agg_value, CONST_SAMPLE_RATE, key., ) else StatsD.logger.error { "[#{self.class.name}] Unknown aggregation type: #{key.type}" } end end end end |
Instance Method Details
#aggregate_precompiled_gauge_metric(precompiled_datagram, value) ⇒ void
This method returns an undefined value.
Aggregates a precompiled metric that can be combined into a single scalar for later flushing.
215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/statsd/instrument/aggregator.rb', line 215 def aggregate_precompiled_gauge_metric(precompiled_datagram, value) unless thread_healthcheck # Fallback: emit directly if thread is unhealthy @sink << precompiled_datagram.to_datagram(value) return end @aggregation_state_mutex.synchronize do @aggregation_state[precompiled_datagram] = value end end |
#aggregate_precompiled_increment_metric(precompiled_datagram, value = 1) ⇒ void
This method returns an undefined value.
Aggregates a precompiled metric that can be combined into a single scalar for later flushing.
171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/statsd/instrument/aggregator.rb', line 171 def aggregate_precompiled_increment_metric(precompiled_datagram, value = 1) unless thread_healthcheck # Fallback: emit directly if thread is unhealthy @sink << precompiled_datagram.to_datagram(value) return end @aggregation_state_mutex.synchronize do @aggregation_state[precompiled_datagram] ||= 0 @aggregation_state[precompiled_datagram] += value end end |
#aggregate_precompiled_timing_metric(precompiled_datagram, value = 1) ⇒ void
This method returns an undefined value.
Aggregates a precompiled metric that can be packed into a single datagram for later flushing.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/statsd/instrument/aggregator.rb', line 189 def aggregate_precompiled_timing_metric(precompiled_datagram, value = 1) unless thread_healthcheck # Fallback: emit directly if thread is unhealthy @sink << precompiled_datagram.to_datagram(value) return end aggregation_state = nil @aggregation_state_mutex.synchronize do values = @aggregation_state[precompiled_datagram] ||= [] if values.size + 1 >= @max_values aggregation_state = @aggregation_state new_aggregation_state end values << value end do_flush(aggregation_state) if aggregation_state end |
#aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION, sample_rate: CONST_SAMPLE_RATE) ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/statsd/instrument/aggregator.rb', line 227 def aggregate_timing(name, value, tags: [], no_prefix: false, type: DISTRIBUTION, sample_rate: CONST_SAMPLE_RATE) unless thread_healthcheck @sink << datagram_builder(no_prefix: no_prefix).timing_value_packed( name, type.to_s, [value], sample_rate, ) return end = () key = packet_key(name, , no_prefix, type, sample_rate: sample_rate) aggregation_state = nil @aggregation_state_mutex.synchronize do values = @aggregation_state[key] ||= [] if values.size + 1 >= @max_values aggregation_state = @aggregation_state new_aggregation_state end values << value end do_flush(aggregation_state) if aggregation_state end |
#flush ⇒ Object
266 267 268 269 270 271 272 273 274 |
# File 'lib/statsd/instrument/aggregator.rb', line 266 def flush state = nil @aggregation_state_mutex.synchronize do state = @aggregation_state new_aggregation_state end do_flush(state) end |
#gauge(name, value, tags: [], no_prefix: false) ⇒ Object
252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/statsd/instrument/aggregator.rb', line 252 def gauge(name, value, tags: [], no_prefix: false) unless thread_healthcheck @sink << datagram_builder(no_prefix: no_prefix).g(name, value, CONST_SAMPLE_RATE, ) return end = () key = packet_key(name, , no_prefix, GAUGE) @aggregation_state_mutex.synchronize do @aggregation_state[key] = value end end |
#increment(name, value = 1, tags: [], no_prefix: false, sample_rate: CONST_SAMPLE_RATE) ⇒ void
This method returns an undefined value.
Increment a counter by a given value and save it for later flushing.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/statsd/instrument/aggregator.rb', line 151 def increment(name, value = 1, tags: [], no_prefix: false, sample_rate: CONST_SAMPLE_RATE) unless thread_healthcheck @sink << datagram_builder(no_prefix: no_prefix).c(name, value, sample_rate, ) return end = () key = packet_key(name, , no_prefix, COUNT, sample_rate: sample_rate) @aggregation_state_mutex.synchronize do @aggregation_state[key] ||= 0 @aggregation_state[key] += value end end |