Module: Wurk::Metrics::Query

Defined in:
lib/wurk/metrics/query.rb

Overview

Read-side for the per-class HASH bucket schema written by ‘Wurk::Metrics::History`. Backs the Web UI’s history pane and any external dashboarding code; both rely on the same HGETALL fan-out over a contiguous range of minute / hour keys.

Window caps are spec-enforced:

minutes ≤ 480  (8h — same as the 10-minute rollup retention)
hours   ≤ 72   (3d — same as MID_TERM retention)

A wider window has no data to read anyway (the buckets are TTL’d out), so we fail loudly rather than silently returning sparse results.

Defined Under Namespace

Classes: WindowTooWide

Constant Summary collapse

MAX_MINUTES =

rubocop:disable Metrics/ModuleLength

480
MAX_HOURS =
72

Class Method Summary collapse

Class Method Details

.accumulate!(totals, hash) ⇒ Object



109
110
111
112
113
114
115
116
117
118
# File 'lib/wurk/metrics/query.rb', line 109

def accumulate!(totals, hash)
  return if hash.nil? || hash.empty?

  hash.each do |field, value|
    klass, kind = field.split('|', 2)
    next unless kind && TOTAL_FIELDS.include?(kind)

    totals[klass][kind.to_sym] += Integer(value)
  end
end

.aggregate_minutes(now, minutes) ⇒ Object



103
104
105
106
107
# File 'lib/wurk/metrics/query.rb', line 103

def aggregate_minutes(now, minutes)
  totals = ::Hash.new { |h, k| h[k] = { p: 0, f: 0, ms: 0 } }
  pipeline_hgetall(minute_keys(now, minutes)).each { |hash| accumulate!(totals, hash) }
  totals
end

.bucket_spec!(bucket) ⇒ Object



62
63
64
65
66
# File 'lib/wurk/metrics/query.rb', line 62

def bucket_spec!(bucket)
  Wurk::Metrics::Rollup::BUCKETS.fetch(bucket) do
    raise ArgumentError, "bucket must be one of #{Wurk::Metrics::Rollup::BUCKETS.keys.inspect}"
  end
end

.bucket_starts(now, step, window) ⇒ Object

The last ‘window/step` step-aligned bucket starts, oldest→newest, so they match the keys the rollup writes.



77
78
79
80
# File 'lib/wurk/metrics/query.rb', line 77

def bucket_starts(now, step, window)
  last = (now.to_i / step) * step
  (0...(window / step)).map { |i| last - (i * step) }.reverse
end

.cap_hours!(hours) ⇒ Object



92
93
94
# File 'lib/wurk/metrics/query.rb', line 92

def cap_hours!(hours)
  check_window!(Integer(hours), MAX_HOURS, 'hours')
end

.cap_minutes!(minutes) ⇒ Object



88
89
90
# File 'lib/wurk/metrics/query.rb', line 88

def cap_minutes!(minutes)
  check_window!(Integer(minutes), MAX_MINUTES, 'minutes')
end

.check_window!(value, max, label) ⇒ Object

Raises:

  • (ArgumentError)


96
97
98
99
100
101
# File 'lib/wurk/metrics/query.rb', line 96

def check_window!(value, max, label)
  raise ArgumentError, "#{label} must be positive" if value <= 0
  raise WindowTooWide, "#{label} must be <= #{max} (got #{value})" if value > max

  value
end

.clamp_history_window!(window_seconds, ttl) ⇒ Object

Raises:

  • (ArgumentError)


68
69
70
71
72
73
# File 'lib/wurk/metrics/query.rb', line 68

def clamp_history_window!(window_seconds, ttl)
  window = Integer(window_seconds)
  raise ArgumentError, 'window must be positive' if window <= 0

  [window, ttl].min
end

.floor_to(time, unit) ⇒ Object



152
153
154
155
156
157
158
# File 'lib/wurk/metrics/query.rb', line 152

def floor_to(time, unit)
  t = time.utc
  case unit
  when :min  then ::Time.utc(t.year, t.month, t.day, t.hour, t.min)
  when :hour then ::Time.utc(t.year, t.month, t.day, t.hour)
  end
end

.for_job(klass, minutes: nil, hours: nil, now: ::Time.now) ⇒ Object

Per-class time-series. ‘minutes` reads the per-minute bucket; `hours` reads the per-class hourly bucket (separate keys per spec, so a long window doesn’t fan out over 4320 minute hashes).



45
46
47
48
# File 'lib/wurk/metrics/query.rb', line 45

def for_job(klass, minutes: nil, hours: nil, now: ::Time.now)
  validate_for_job!(klass, minutes, hours)
  minutes ? minute_series(klass, now, cap_minutes!(minutes)) : hour_series(klass, now, cap_hours!(hours))
end

.history(bucket, window_seconds, now: ::Time.now) ⇒ Object

Cluster-total time-series for the dashboard throughput/failures charts, read from the compact buckets written by Wurk::Metrics::Rollup. ‘bucket` is ’1m’/‘5m’/‘1h’; ‘window_seconds` is clamped to that bucket’s retention. Returns ‘[p:, f:, ms:, …]` oldest→newest, gap-filled with zeros so a chart has a continuous x-axis.



55
56
57
58
59
60
# File 'lib/wurk/metrics/query.rb', line 55

def history(bucket, window_seconds, now: ::Time.now)
  step, ttl = bucket_spec!(bucket)
  starts = bucket_starts(now, step, clamp_history_window!(window_seconds, ttl))
  rows = pipeline_hmget(starts.map { |s| Wurk::Metrics::Rollup.bucket_key(bucket, s) }, %w[p f ms])
  starts.zip(rows).map { |at, (p, f, ms)| { at: at, p: p.to_i, f: f.to_i, ms: ms.to_i } }
end

.hour_series(klass, now, hours) ⇒ Object



125
126
127
128
129
# File 'lib/wurk/metrics/query.rb', line 125

def hour_series(klass, now, hours)
  timestamps = hour_timestamps(now, hours)
  keys = timestamps.map { |t| Wurk::Metrics::History.hour_key(klass, t) }
  zip_rows(timestamps, pipeline_hmget(keys, %w[p f ms]))
end

.hour_timestamps(now, hours) ⇒ Object



147
148
149
150
# File 'lib/wurk/metrics/query.rb', line 147

def hour_timestamps(now, hours)
  floor = floor_to(now, :hour)
  (0...hours).map { |i| floor - (i * 3600) }.reverse
end

.minute_keys(now, minutes) ⇒ Object



135
136
137
# File 'lib/wurk/metrics/query.rb', line 135

def minute_keys(now, minutes)
  minute_timestamps(now, minutes).map { |t| Wurk::Metrics::History.minute_key(t) }
end

.minute_series(klass, now, minutes) ⇒ Object



120
121
122
123
# File 'lib/wurk/metrics/query.rb', line 120

def minute_series(klass, now, minutes)
  rows = pipeline_hmget(minute_keys(now, minutes), %W[#{klass}|p #{klass}|f #{klass}|ms])
  zip_rows(minute_timestamps(now, minutes), rows)
end

.minute_timestamps(now, minutes) ⇒ Object

Truncate to the minute so the bucket boundary matches what the writer used. Fractional-second drift would otherwise pull in an unrelated minute on the edge of the window.



142
143
144
145
# File 'lib/wurk/metrics/query.rb', line 142

def minute_timestamps(now, minutes)
  floor = floor_to(now, :min)
  (0...minutes).map { |i| floor - (i * 60) }.reverse
end

.pipeline_hgetall(keys) ⇒ Object



160
161
162
163
164
# File 'lib/wurk/metrics/query.rb', line 160

def pipeline_hgetall(keys)
  return [] if keys.empty?

  Wurk.redis { |c| c.pipelined { |p| keys.each { |k| p.call('HGETALL', k) } } }
end

.pipeline_hmget(keys, fields) ⇒ Object



166
167
168
169
170
# File 'lib/wurk/metrics/query.rb', line 166

def pipeline_hmget(keys, fields)
  return [] if keys.empty?

  Wurk.redis { |c| c.pipelined { |p| keys.each { |k| p.call('HMGET', k, *fields) } } }
end

.top_jobs(class_filter: nil, minutes: 60, hours: nil, now: ::Time.now) ⇒ Object

Aggregate per-job-class totals over a recent window of minute buckets. Returns array of ‘[class_name, f:, ms:]` tuples sorted by volume (p + f) descending so the UI’s “top jobs” table renders without a second sort pass.



34
35
36
37
38
39
40
# File 'lib/wurk/metrics/query.rb', line 34

def top_jobs(class_filter: nil, minutes: 60, hours: nil, now: ::Time.now)
  minutes = hours * 60 if hours
  cap_hours!(hours) if hours
  rows = aggregate_minutes(now, cap_minutes!(minutes)).to_a
  rows = rows.select { |(k, _)| k.start_with?(class_filter) } if class_filter && !class_filter.empty?
  rows.sort_by { |(_k, s)| -(s[:p] + s[:f]) }
end

.validate_for_job!(klass, minutes, hours) ⇒ Object

Raises:

  • (ArgumentError)


82
83
84
85
86
# File 'lib/wurk/metrics/query.rb', line 82

def validate_for_job!(klass, minutes, hours)
  raise ArgumentError, 'klass required' if klass.nil? || klass.empty?
  raise ArgumentError, 'pass exactly one of minutes: or hours:' if minutes && hours
  raise ArgumentError, 'pass minutes: or hours:' if minutes.nil? && hours.nil?
end

.zip_rows(timestamps, rows) ⇒ Object



131
132
133
# File 'lib/wurk/metrics/query.rb', line 131

def zip_rows(timestamps, rows)
  timestamps.zip(rows).map { |at, (p, f, ms)| { at: at, p: p.to_i, f: f.to_i, ms: ms.to_i } }
end