Class: Wurk::Metrics::Rollup

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/metrics/rollup.rb

Overview

Leader-only background thread that rolls the per-class minute buckets written by Wurk::Metrics::History (‘j|YYMMDD|H:M`) up into compact, cluster-total time-series buckets the dashboard “throughput” / “failures” charts read directly:

jr|1m|<epoch>   HASH {p,f,ms}   TTL 24h   (1-minute resolution)
jr|5m|<epoch>   HASH {p,f,ms}   TTL 7d    (5-minute resolution)
jr|1h|<epoch>   HASH {p,f,ms}   TTL 30d   (1-hour resolution)

‘<epoch>` is the UTC start-of-bucket as integer seconds. Totals are summed across every job class, so a 30-day chart reads ~720 small hashes instead of fanning out over ~43k per-class minute keys.

Every write is an idempotent HSET. Each tick recomputes the trailing few 1m buckets from the source minute hash, then recomputes the coarse buckets from their 1m children. Re-running a tick (a missed tick, a leadership change, a late metric write) converges to the same totals — it never double-counts. Storage is bounded purely by the per-bucket TTLs; see docs/metrics-history.md for the retention math.

Constant Summary collapse

PREFIX =
'jr'
BUCKETS =

bucket => [step_seconds, ttl_seconds]. The retention is the issue’s spec: 1m kept 24h, 5m kept 7d, 1h kept 30d.

{
  '1m' => [60,   24 * 60 * 60],
  '5m' => [300,  7 * 24 * 60 * 60],
  '1h' => [3600, 30 * 24 * 60 * 60]
}.freeze
COARSE =
%w[5m 1h].freeze
DEFAULT_TICK_SECONDS =
60
LOOKBACK_MINUTES =

Re-roll the last N completed minutes from source on every tick (idempotent). This self-heals a leadership failover / restart or a late metric write up to N minutes old — the source ‘j|…` buckets live 3 days, so re-reading them folds the gap back in. Only outages longer than this leave a hole that ages out with the bucket TTL (best-effort metrics).

15

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary

Attributes included from Component

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config) ⇒ Rollup

Returns a new instance of Rollup.



54
55
56
57
58
59
60
61
# File 'lib/wurk/metrics/rollup.rb', line 54

def initialize(config)
  @config = config
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS
  @thread = nil
end

Class Method Details

.bucket_key(bucket, epoch) ⇒ Object



50
51
52
# File 'lib/wurk/metrics/rollup.rb', line 50

def self.bucket_key(bucket, epoch)
  "#{PREFIX}|#{bucket}|#{epoch}"
end

Instance Method Details

#roll(now = ::Time.now) ⇒ Object

One rollup pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual “roll now” can drive it directly.



92
93
94
95
96
97
98
# File 'lib/wurk/metrics/rollup.rb', line 92

def roll(now = ::Time.now)
  cur_min = floor_min(now)
  minutes = (1..LOOKBACK_MINUTES).map { |i| cur_min - (i * 60) }
  minutes.each { |epoch_min| write_minute_bucket(epoch_min) }
  recompute_coarse(minutes)
  nil
end

#startObject



63
64
65
66
67
68
69
70
71
# File 'lib/wurk/metrics/rollup.rb', line 63

def start
  @thread ||= safe_thread('metrics-rollup') do # rubocop:disable Naming/MemoizedInstanceVariableName
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



73
74
75
76
77
78
# File 'lib/wurk/metrics/rollup.rb', line 73

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tick(now: ::Time.now) ⇒ Object

Leader-gated: only the elected leader writes the cluster-total series, so N workers don’t each HSET the same buckets every minute.



82
83
84
85
86
87
88
# File 'lib/wurk/metrics/rollup.rb', line 82

def tick(now: ::Time.now)
  return unless leader?

  roll(now)
rescue StandardError => e
  handle_exception(e, { context: 'metrics-rollup' })
end