Module: Wurk::Metrics::Query
- Defined in:
- lib/wurk/metrics/query.rb
Overview
Read-side for the per-class HASH bucket schema written by ‘Wurk::Metrics::History`. Backs the Web UI’s history pane and any external dashboarding code; both rely on the same HGETALL fan-out over a contiguous range of minute / hour keys.
Window caps are spec-enforced:
minutes ≤ 480 (8h — same as the 10-minute rollup retention)
hours ≤ 72 (3d — same as MID_TERM retention)
A wider window has no data to read anyway (the buckets are TTL’d out), so we fail loudly rather than silently returning sparse results.
Defined Under Namespace
Classes: WindowTooWide
Constant Summary collapse
- MAX_MINUTES =
rubocop:disable Metrics/ModuleLength
480- MAX_HOURS =
72
Class Method Summary collapse
- .accumulate!(totals, hash) ⇒ Object
- .aggregate_minutes(now, minutes) ⇒ Object
- .bucket_spec!(bucket) ⇒ Object
-
.bucket_starts(now, step, window) ⇒ Object
The last ‘window/step` step-aligned bucket starts, oldest→newest, so they match the keys the rollup writes.
- .cap_hours!(hours) ⇒ Object
- .cap_minutes!(minutes) ⇒ Object
- .check_window!(value, max, label) ⇒ Object
- .clamp_history_window!(window_seconds, ttl) ⇒ Object
- .floor_to(time, unit) ⇒ Object
-
.for_job(klass, minutes: nil, hours: nil, now: ::Time.now) ⇒ Object
Per-class time-series.
-
.history(bucket, window_seconds, now: ::Time.now) ⇒ Object
Cluster-total time-series for the dashboard throughput/failures charts, read from the compact buckets written by Wurk::Metrics::Rollup.
- .hour_series(klass, now, hours) ⇒ Object
- .hour_timestamps(now, hours) ⇒ Object
- .minute_keys(now, minutes) ⇒ Object
- .minute_series(klass, now, minutes) ⇒ Object
-
.minute_timestamps(now, minutes) ⇒ Object
Truncate to the minute so the bucket boundary matches what the writer used.
- .pipeline_hgetall(keys) ⇒ Object
- .pipeline_hmget(keys, fields) ⇒ Object
-
.top_jobs(class_filter: nil, minutes: 60, hours: nil, now: ::Time.now) ⇒ Object
Aggregate per-job-class totals over a recent window of minute buckets.
- .validate_for_job!(klass, minutes, hours) ⇒ Object
- .zip_rows(timestamps, rows) ⇒ Object
Class Method Details
.accumulate!(totals, hash) ⇒ Object
109 110 111 112 113 114 115 116 117 118 |
# File 'lib/wurk/metrics/query.rb', line 109 def accumulate!(totals, hash) return if hash.nil? || hash.empty? hash.each do |field, value| klass, kind = field.split('|', 2) next unless kind && TOTAL_FIELDS.include?(kind) totals[klass][kind.to_sym] += Integer(value) end end |
.aggregate_minutes(now, minutes) ⇒ Object
103 104 105 106 107 |
# File 'lib/wurk/metrics/query.rb', line 103 def aggregate_minutes(now, minutes) totals = ::Hash.new { |h, k| h[k] = { p: 0, f: 0, ms: 0 } } pipeline_hgetall(minute_keys(now, minutes)).each { |hash| accumulate!(totals, hash) } totals end |
.bucket_spec!(bucket) ⇒ Object
62 63 64 65 66 |
# File 'lib/wurk/metrics/query.rb', line 62 def bucket_spec!(bucket) Wurk::Metrics::Rollup::BUCKETS.fetch(bucket) do raise ArgumentError, "bucket must be one of #{Wurk::Metrics::Rollup::BUCKETS.keys.inspect}" end end |
.bucket_starts(now, step, window) ⇒ Object
The last ‘window/step` step-aligned bucket starts, oldest→newest, so they match the keys the rollup writes.
77 78 79 80 |
# File 'lib/wurk/metrics/query.rb', line 77 def bucket_starts(now, step, window) last = (now.to_i / step) * step (0...(window / step)).map { |i| last - (i * step) }.reverse end |
.cap_hours!(hours) ⇒ Object
92 93 94 |
# File 'lib/wurk/metrics/query.rb', line 92 def cap_hours!(hours) check_window!(Integer(hours), MAX_HOURS, 'hours') end |
.cap_minutes!(minutes) ⇒ Object
88 89 90 |
# File 'lib/wurk/metrics/query.rb', line 88 def cap_minutes!(minutes) check_window!(Integer(minutes), MAX_MINUTES, 'minutes') end |
.check_window!(value, max, label) ⇒ Object
96 97 98 99 100 101 |
# File 'lib/wurk/metrics/query.rb', line 96 def check_window!(value, max, label) raise ArgumentError, "#{label} must be positive" if value <= 0 raise WindowTooWide, "#{label} must be <= #{max} (got #{value})" if value > max value end |
.clamp_history_window!(window_seconds, ttl) ⇒ Object
68 69 70 71 72 73 |
# File 'lib/wurk/metrics/query.rb', line 68 def clamp_history_window!(window_seconds, ttl) window = Integer(window_seconds) raise ArgumentError, 'window must be positive' if window <= 0 [window, ttl].min end |
.floor_to(time, unit) ⇒ Object
152 153 154 155 156 157 158 |
# File 'lib/wurk/metrics/query.rb', line 152 def floor_to(time, unit) t = time.utc case unit when :min then ::Time.utc(t.year, t.month, t.day, t.hour, t.min) when :hour then ::Time.utc(t.year, t.month, t.day, t.hour) end end |
.for_job(klass, minutes: nil, hours: nil, now: ::Time.now) ⇒ Object
Per-class time-series. ‘minutes` reads the per-minute bucket; `hours` reads the per-class hourly bucket (separate keys per spec, so a long window doesn’t fan out over 4320 minute hashes).
45 46 47 48 |
# File 'lib/wurk/metrics/query.rb', line 45 def for_job(klass, minutes: nil, hours: nil, now: ::Time.now) validate_for_job!(klass, minutes, hours) minutes ? minute_series(klass, now, cap_minutes!(minutes)) : hour_series(klass, now, cap_hours!(hours)) end |
.history(bucket, window_seconds, now: ::Time.now) ⇒ Object
Cluster-total time-series for the dashboard throughput/failures charts, read from the compact buckets written by Wurk::Metrics::Rollup. ‘bucket` is ’1m’/‘5m’/‘1h’; ‘window_seconds` is clamped to that bucket’s retention. Returns ‘[p:, f:, ms:, …]` oldest→newest, gap-filled with zeros so a chart has a continuous x-axis.
55 56 57 58 59 60 |
# File 'lib/wurk/metrics/query.rb', line 55 def history(bucket, window_seconds, now: ::Time.now) step, ttl = bucket_spec!(bucket) starts = bucket_starts(now, step, clamp_history_window!(window_seconds, ttl)) rows = pipeline_hmget(starts.map { |s| Wurk::Metrics::Rollup.bucket_key(bucket, s) }, %w[p f ms]) starts.zip(rows).map { |at, (p, f, ms)| { at: at, p: p.to_i, f: f.to_i, ms: ms.to_i } } end |
.hour_series(klass, now, hours) ⇒ Object
125 126 127 128 129 |
# File 'lib/wurk/metrics/query.rb', line 125 def hour_series(klass, now, hours) = (now, hours) keys = .map { |t| Wurk::Metrics::History.hour_key(klass, t) } zip_rows(, pipeline_hmget(keys, %w[p f ms])) end |
.hour_timestamps(now, hours) ⇒ Object
147 148 149 150 |
# File 'lib/wurk/metrics/query.rb', line 147 def (now, hours) floor = floor_to(now, :hour) (0...hours).map { |i| floor - (i * 3600) }.reverse end |
.minute_keys(now, minutes) ⇒ Object
135 136 137 |
# File 'lib/wurk/metrics/query.rb', line 135 def minute_keys(now, minutes) (now, minutes).map { |t| Wurk::Metrics::History.minute_key(t) } end |
.minute_series(klass, now, minutes) ⇒ Object
120 121 122 123 |
# File 'lib/wurk/metrics/query.rb', line 120 def minute_series(klass, now, minutes) rows = pipeline_hmget(minute_keys(now, minutes), %W[#{klass}|p #{klass}|f #{klass}|ms]) zip_rows((now, minutes), rows) end |
.minute_timestamps(now, minutes) ⇒ Object
Truncate to the minute so the bucket boundary matches what the writer used. Fractional-second drift would otherwise pull in an unrelated minute on the edge of the window.
142 143 144 145 |
# File 'lib/wurk/metrics/query.rb', line 142 def (now, minutes) floor = floor_to(now, :min) (0...minutes).map { |i| floor - (i * 60) }.reverse end |
.pipeline_hgetall(keys) ⇒ Object
160 161 162 163 164 |
# File 'lib/wurk/metrics/query.rb', line 160 def pipeline_hgetall(keys) return [] if keys.empty? Wurk.redis { |c| c.pipelined { |p| keys.each { |k| p.call('HGETALL', k) } } } end |
.pipeline_hmget(keys, fields) ⇒ Object
166 167 168 169 170 |
# File 'lib/wurk/metrics/query.rb', line 166 def pipeline_hmget(keys, fields) return [] if keys.empty? Wurk.redis { |c| c.pipelined { |p| keys.each { |k| p.call('HMGET', k, *fields) } } } end |
.top_jobs(class_filter: nil, minutes: 60, hours: nil, now: ::Time.now) ⇒ Object
Aggregate per-job-class totals over a recent window of minute buckets. Returns array of ‘[class_name, f:, ms:]` tuples sorted by volume (p + f) descending so the UI’s “top jobs” table renders without a second sort pass.
34 35 36 37 38 39 40 |
# File 'lib/wurk/metrics/query.rb', line 34 def top_jobs(class_filter: nil, minutes: 60, hours: nil, now: ::Time.now) minutes = hours * 60 if hours cap_hours!(hours) if hours rows = aggregate_minutes(now, cap_minutes!(minutes)).to_a rows = rows.select { |(k, _)| k.start_with?(class_filter) } if class_filter && !class_filter.empty? rows.sort_by { |(_k, s)| -(s[:p] + s[:f]) } end |
.validate_for_job!(klass, minutes, hours) ⇒ Object
82 83 84 85 86 |
# File 'lib/wurk/metrics/query.rb', line 82 def validate_for_job!(klass, minutes, hours) raise ArgumentError, 'klass required' if klass.nil? || klass.empty? raise ArgumentError, 'pass exactly one of minutes: or hours:' if minutes && hours raise ArgumentError, 'pass minutes: or hours:' if minutes.nil? && hours.nil? end |
.zip_rows(timestamps, rows) ⇒ Object
131 132 133 |
# File 'lib/wurk/metrics/query.rb', line 131 def zip_rows(, rows) .zip(rows).map { |at, (p, f, ms)| { at: at, p: p.to_i, f: f.to_i, ms: ms.to_i } } end |