Module: Wurk::Metrics::Query

Defined in:
lib/wurk/metrics/query.rb

Overview

Read-side for the per-class HASH bucket schema written by ‘Wurk::Metrics::History`. Backs the Web UI’s history pane and any external dashboarding code; both rely on the same HGETALL fan-out over a contiguous range of minute / hour keys.

Window caps are spec-enforced:

minutes ≤ 480  (8h — same as the 10-minute rollup retention)
hours   ≤ 72   (3d — same as MID_TERM retention)

A wider window has no data to read anyway (the buckets are TTL’d out), so we fail loudly rather than silently returning sparse results.

Defined Under Namespace

Classes: WindowTooWide

Constant Summary collapse

MAX_MINUTES =

rubocop:disable Metrics/ModuleLength

480
MAX_HOURS =
72
MAX_QUEUE_SERIES =
25

Class Method Summary collapse

Class Method Details

.accumulate!(totals, hash) ⇒ Object



148
149
150
151
152
153
154
155
156
157
# File 'lib/wurk/metrics/query.rb', line 148

def accumulate!(totals, hash)
  return if hash.nil? || hash.empty?

  hash.each do |field, value|
    klass, kind = field.split('|', 2)
    next unless kind && TOTAL_FIELDS.include?(kind)

    totals[klass][kind.to_sym] += Integer(value)
  end
end

.aggregate_minutes(now, minutes) ⇒ Object



142
143
144
145
146
# File 'lib/wurk/metrics/query.rb', line 142

def aggregate_minutes(now, minutes)
  totals = ::Hash.new { |h, k| h[k] = { p: 0, f: 0, ms: 0 } }
  pipeline_hgetall(minute_keys(now, minutes)).each { |hash| accumulate!(totals, hash) }
  totals
end

.bucket_spec!(bucket) ⇒ Object



101
102
103
104
105
# File 'lib/wurk/metrics/query.rb', line 101

def bucket_spec!(bucket)
  Wurk::Metrics::Rollup::BUCKETS.fetch(bucket) do
    raise ArgumentError, "bucket must be one of #{Wurk::Metrics::Rollup::BUCKETS.keys.inspect}"
  end
end

.bucket_starts(now, step, window) ⇒ Object

The last ‘window/step` step-aligned bucket starts, oldest→newest, so they match the keys the rollup writes.



116
117
118
119
# File 'lib/wurk/metrics/query.rb', line 116

def bucket_starts(now, step, window)
  last = (now.to_i / step) * step
  (0...(window / step)).map { |i| last - (i * step) }.reverse
end

.cap_hours!(hours) ⇒ Object



131
132
133
# File 'lib/wurk/metrics/query.rb', line 131

def cap_hours!(hours)
  check_window!(Integer(hours), MAX_HOURS, 'hours')
end

.cap_minutes!(minutes) ⇒ Object



127
128
129
# File 'lib/wurk/metrics/query.rb', line 127

def cap_minutes!(minutes)
  check_window!(Integer(minutes), MAX_MINUTES, 'minutes')
end

.check_window!(value, max, label) ⇒ Object

Raises:

  • (ArgumentError)


135
136
137
138
139
140
# File 'lib/wurk/metrics/query.rb', line 135

def check_window!(value, max, label)
  raise ArgumentError, "#{label} must be positive" if value <= 0
  raise WindowTooWide, "#{label} must be <= #{max} (got #{value})" if value > max

  value
end

.clamp_history_window!(window_seconds, ttl) ⇒ Object

Raises:

  • (ArgumentError)


107
108
109
110
111
112
# File 'lib/wurk/metrics/query.rb', line 107

def clamp_history_window!(window_seconds, ttl)
  window = Integer(window_seconds)
  raise ArgumentError, 'window must be positive' if window <= 0

  [window, ttl].min
end

.floor_to(time, unit) ⇒ Object



191
192
193
194
195
196
197
# File 'lib/wurk/metrics/query.rb', line 191

def floor_to(time, unit)
  t = time.utc
  case unit
  when :min  then ::Time.utc(t.year, t.month, t.day, t.hour, t.min)
  when :hour then ::Time.utc(t.year, t.month, t.day, t.hour)
  end
end

.for_job(klass, minutes: nil, hours: nil, now: ::Time.now) ⇒ Object

Per-class time-series. ‘minutes` reads the per-minute bucket; `hours` reads the per-class hourly bucket (separate keys per spec, so a long window doesn’t fan out over 4320 minute hashes).



47
48
49
50
# File 'lib/wurk/metrics/query.rb', line 47

def for_job(klass, minutes: nil, hours: nil, now: ::Time.now)
  validate_for_job!(klass, minutes, hours)
  minutes ? minute_series(klass, now, cap_minutes!(minutes)) : hour_series(klass, now, cap_hours!(hours))
end

.history(bucket, window_seconds, now: ::Time.now) ⇒ Object

Cluster-total time-series for the dashboard throughput/failures charts, read from the compact buckets written by Wurk::Metrics::Rollup. ‘bucket` is ’1m’/‘5m’/‘1h’; ‘window_seconds` is clamped to that bucket’s retention. Returns ‘[p:, f:, ms:, …]` oldest→newest, gap-filled with zeros so a chart has a continuous x-axis.



57
58
59
60
61
62
# File 'lib/wurk/metrics/query.rb', line 57

def history(bucket, window_seconds, now: ::Time.now)
  step, ttl = bucket_spec!(bucket)
  starts = bucket_starts(now, step, clamp_history_window!(window_seconds, ttl))
  rows = pipeline_hmget(starts.map { |s| Wurk::Metrics::Rollup.bucket_key(bucket, s) }, %w[p f ms])
  starts.zip(rows).map { |at, (p, f, ms)| { at: at, p: p.to_i, f: f.to_i, ms: ms.to_i } }
end

.hour_series(klass, now, hours) ⇒ Object



164
165
166
167
168
# File 'lib/wurk/metrics/query.rb', line 164

def hour_series(klass, now, hours)
  timestamps = hour_timestamps(now, hours)
  keys = timestamps.map { |t| Wurk::Metrics::History.hour_key(klass, t) }
  zip_rows(timestamps, pipeline_hmget(keys, %w[p f ms]))
end

.hour_timestamps(now, hours) ⇒ Object



186
187
188
189
# File 'lib/wurk/metrics/query.rb', line 186

def hour_timestamps(now, hours)
  floor = floor_to(now, :hour)
  (0...hours).map { |i| floor - (i * 3600) }.reverse
end

.minute_keys(now, minutes) ⇒ Object



174
175
176
# File 'lib/wurk/metrics/query.rb', line 174

def minute_keys(now, minutes)
  minute_timestamps(now, minutes).map { |t| Wurk::Metrics::History.minute_key(t) }
end

.minute_series(klass, now, minutes) ⇒ Object



159
160
161
162
# File 'lib/wurk/metrics/query.rb', line 159

def minute_series(klass, now, minutes)
  rows = pipeline_hmget(minute_keys(now, minutes), %W[#{klass}|p #{klass}|f #{klass}|ms])
  zip_rows(minute_timestamps(now, minutes), rows)
end

.minute_timestamps(now, minutes) ⇒ Object

Truncate to the minute so the bucket boundary matches what the writer used. Fractional-second drift would otherwise pull in an unrelated minute on the edge of the window.



181
182
183
184
# File 'lib/wurk/metrics/query.rb', line 181

def minute_timestamps(now, minutes)
  floor = floor_to(now, :min)
  (0...minutes).map { |i| floor - (i * 60) }.reverse
end

.pipeline_hgetall(keys) ⇒ Object



199
200
201
202
203
# File 'lib/wurk/metrics/query.rb', line 199

def pipeline_hgetall(keys)
  return [] if keys.empty?

  Wurk.redis { |c| c.pipelined { |p| keys.each { |k| p.call('HGETALL', k) } } }
end

.pipeline_hmget(keys, fields) ⇒ Object



205
206
207
208
209
# File 'lib/wurk/metrics/query.rb', line 205

def pipeline_hmget(keys, fields)
  return [] if keys.empty?

  Wurk.redis { |c| c.pipelined { |p| keys.each { |k| p.call('HMGET', k, *fields) } } }
end

.queue_bucket_hashes(bucket, starts) ⇒ Object



81
82
83
84
# File 'lib/wurk/metrics/query.rb', line 81

def queue_bucket_hashes(bucket, starts)
  pipeline_hgetall(starts.map { |s| Wurk::Metrics::QueueRollup.bucket_key(bucket, s) })
    .map { |h| h.is_a?(::Array) ? h.each_slice(2).to_h : (h || {}) }
end

.queue_history(bucket, window_seconds, queues: nil, now: ::Time.now) ⇒ Object

Per-queue size/latency gauge time-series written by Metrics::QueueRollup. ‘bucket` is ’1m’/‘5m’/‘1h’; ‘window_seconds` is clamped to the bucket’s retention. Returns one entry per live queue (or the explicit ‘queues:` list) — `[points: [{at:, size:, latency:, …]}, …]` — oldest→newest, gap-filled with zeros so a chart has a continuous x-axis. Capped at MAX_QUEUE_SERIES queues to bound the payload; the cap is logged-free because queue cardinality is small.



71
72
73
74
75
76
77
78
79
# File 'lib/wurk/metrics/query.rb', line 71

def queue_history(bucket, window_seconds, queues: nil, now: ::Time.now)
  step, ttl = bucket_spec!(bucket)
  starts = bucket_starts(now, step, clamp_history_window!(window_seconds, ttl))
  names = queue_names(queues)
  return [] if names.empty?

  hashes = queue_bucket_hashes(bucket, starts)
  names.map { |name| { name: name, points: queue_points(name, starts, hashes) } }
end

.queue_names(queues) ⇒ Object



88
89
90
91
# File 'lib/wurk/metrics/query.rb', line 88

def queue_names(queues)
  names = queues || Wurk.redis { |c| c.call('SMEMBERS', Wurk::Keys::QUEUES_SET) }
  names.sort.first(MAX_QUEUE_SERIES)
end

.queue_points(name, starts, hashes) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/wurk/metrics/query.rb', line 93

def queue_points(name, starts, hashes)
  size_field = "#{name}|#{Wurk::Metrics::QueueRollup::SIZE_KIND}"
  lat_field  = "#{name}|#{Wurk::Metrics::QueueRollup::LAT_KIND}"
  starts.zip(hashes).map do |at, hash|
    { at: at, size: hash[size_field].to_i, latency: (hash[lat_field] || 0).to_f }
  end
end

.top_jobs(class_filter: nil, minutes: 60, hours: nil, now: ::Time.now) ⇒ Object

Aggregate per-job-class totals over a recent window of minute buckets. Returns array of ‘[class_name, f:, ms:]` tuples sorted by volume (p + f) descending so the UI’s “top jobs” table renders without a second sort pass.



36
37
38
39
40
41
42
# File 'lib/wurk/metrics/query.rb', line 36

def top_jobs(class_filter: nil, minutes: 60, hours: nil, now: ::Time.now)
  minutes = hours * 60 if hours
  cap_hours!(hours) if hours
  rows = aggregate_minutes(now, cap_minutes!(minutes)).to_a
  rows = rows.select { |(k, _)| k.start_with?(class_filter) } if class_filter && !class_filter.empty?
  rows.sort_by { |(_k, s)| -(s[:p] + s[:f]) }
end

.validate_for_job!(klass, minutes, hours) ⇒ Object

Raises:

  • (ArgumentError)


121
122
123
124
125
# File 'lib/wurk/metrics/query.rb', line 121

def validate_for_job!(klass, minutes, hours)
  raise ArgumentError, 'klass required' if klass.nil? || klass.empty?
  raise ArgumentError, 'pass exactly one of minutes: or hours:' if minutes && hours
  raise ArgumentError, 'pass minutes: or hours:' if minutes.nil? && hours.nil?
end

.zip_rows(timestamps, rows) ⇒ Object



170
171
172
# File 'lib/wurk/metrics/query.rb', line 170

def zip_rows(timestamps, rows)
  timestamps.zip(rows).map { |at, (p, f, ms)| { at: at, p: p.to_i, f: f.to_i, ms: ms.to_i } }
end