Class: Wurk::Metrics::Rollup
- Inherits:
-
Object
- Object
- Wurk::Metrics::Rollup
- Includes:
- Component
- Defined in:
- lib/wurk/metrics/rollup.rb
Overview
Leader-only background thread that rolls the per-class minute buckets written by Wurk::Metrics::History (‘j|YYMMDD|H:M`) up into compact, cluster-total time-series buckets the dashboard “throughput” / “failures” charts read directly:
jr|1m|<epoch> HASH {p,f,ms} TTL 24h (1-minute resolution)
jr|5m|<epoch> HASH {p,f,ms} TTL 7d (5-minute resolution)
jr|1h|<epoch> HASH {p,f,ms} TTL 30d (1-hour resolution)
‘<epoch>` is the UTC start-of-bucket as integer seconds. Totals are summed across every job class, so a 30-day chart reads ~720 small hashes instead of fanning out over ~43k per-class minute keys.
Every write is an idempotent HSET. Each tick recomputes the trailing few 1m buckets from the source minute hash, then recomputes the coarse buckets from their 1m children. Re-running a tick (a missed tick, a leadership change, a late metric write) converges to the same totals — it never double-counts. Storage is bounded purely by the per-bucket TTLs; see docs/metrics-history.md for the retention math.
Constant Summary collapse
- PREFIX =
'jr'- BUCKETS =
bucket => [step_seconds, ttl_seconds]. The retention is the issue’s spec: 1m kept 24h, 5m kept 7d, 1h kept 30d.
{ '1m' => [60, 24 * 60 * 60], '5m' => [300, 7 * 24 * 60 * 60], '1h' => [3600, 30 * 24 * 60 * 60] }.freeze
- COARSE =
%w[5m 1h].freeze
- DEFAULT_TICK_SECONDS =
60- LOOKBACK_MINUTES =
Re-roll the last N completed minutes from source on every tick (idempotent). This self-heals a leadership failover / restart or a late metric write up to N minutes old — the source ‘j|…` buckets live 3 days, so re-reading them folds the gap back in. Only outages longer than this leave a hole that ages out with the bucket TTL (best-effort metrics).
15
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary
Attributes included from Component
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(config) ⇒ Rollup
constructor
A new instance of Rollup.
-
#roll(now = ::Time.now) ⇒ Object
One rollup pass, bypassing the leader gate and the sleep loop.
- #start ⇒ Object
- #terminate ⇒ Object
-
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader writes the cluster-total series, so N workers don’t each HSET the same buckets every minute.
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config) ⇒ Rollup
Returns a new instance of Rollup.
54 55 56 57 58 59 60 61 |
# File 'lib/wurk/metrics/rollup.rb', line 54 def initialize(config) @config = config @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS @thread = nil end |
Class Method Details
.bucket_key(bucket, epoch) ⇒ Object
50 51 52 |
# File 'lib/wurk/metrics/rollup.rb', line 50 def self.bucket_key(bucket, epoch) "#{PREFIX}|#{bucket}|#{epoch}" end |
Instance Method Details
#roll(now = ::Time.now) ⇒ Object
One rollup pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual “roll now” can drive it directly.
92 93 94 95 96 97 98 |
# File 'lib/wurk/metrics/rollup.rb', line 92 def roll(now = ::Time.now) cur_min = floor_min(now) minutes = (1..LOOKBACK_MINUTES).map { |i| cur_min - (i * 60) } minutes.each { |epoch_min| write_minute_bucket(epoch_min) } recompute_coarse(minutes) nil end |
#start ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/wurk/metrics/rollup.rb', line 63 def start @thread ||= safe_thread('metrics-rollup') do # rubocop:disable Naming/MemoizedInstanceVariableName wait until @done tick wait end end end |
#terminate ⇒ Object
73 74 75 76 77 78 |
# File 'lib/wurk/metrics/rollup.rb', line 73 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader writes the cluster-total series, so N workers don’t each HSET the same buckets every minute.
82 83 84 85 86 87 88 |
# File 'lib/wurk/metrics/rollup.rb', line 82 def tick(now: ::Time.now) return unless leader? roll(now) rescue StandardError => e handle_exception(e, { context: 'metrics-rollup' }) end |