Class: Wurk::Metrics::QueueRollup
- Inherits:
-
Object
- Object
- Wurk::Metrics::QueueRollup
- Includes:
- Component
- Defined in:
- lib/wurk/metrics/queue_rollup.rb
Overview
Leader-only background thread that snapshots each queue’s current depth (LLEN) and head-of-line latency into compact per-queue gauge buckets the dashboard’s “queue size / latency over time” charts read directly:
qm|1m|<epoch> HASH {<queue>|sz, <queue>|lt} TTL 24h
qm|5m|<epoch> HASH {<queue>|sz, <queue>|lt} TTL 7d
qm|1h|<epoch> HASH {<queue>|sz, <queue>|lt} TTL 30d
Unlike Metrics::Rollup (which SUMS counters rolled up from a source), size and latency are GAUGES — point-in-time values — so each tick samples “now” and writes it to the current bucket at every resolution. Within a coarse (5m/1h) bucket the per-minute ticks overwrite, so the bucket holds the latest sample in its window (a “last value” downsample, which is the right summary for a gauge). Leader-gated so N workers don’t each sample the same queues every minute. ‘<epoch>` is the UTC start-of-bucket.
Spec: docs/target/sidekiq-ent.md §5.2 (sidekiq.queue.size / sidekiq.queue.latency gauges), §7 Historical tab.
Constant Summary collapse
- PREFIX =
'qm'- SIZE_KIND =
'sz'- LAT_KIND =
'lt'- BUCKETS =
Mirror Metrics::Rollup retention so the dashboard’s range selector (24h·1m / 7d·5m / 30d·1h) maps 1:1 to both the throughput and the queue-gauge series.
Wurk::Metrics::Rollup::BUCKETS
- DEFAULT_TICK_SECONDS =
60
Constants included from Component
Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE
Instance Attribute Summary
Attributes included from Component
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(config) ⇒ QueueRollup
constructor
A new instance of QueueRollup.
-
#sample(now = ::Time.now) ⇒ Object
One sampling pass, bypassing the leader gate and the sleep loop.
- #start ⇒ Object
- #terminate ⇒ Object
-
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader samples, so N workers don’t each HSET the same buckets every minute.
Methods included from Component
#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(config) ⇒ QueueRollup
Returns a new instance of QueueRollup.
46 47 48 49 50 51 52 53 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 46 def initialize(config) @config = config @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS @thread = nil end |
Class Method Details
.bucket_key(bucket, epoch) ⇒ Object
42 43 44 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 42 def self.bucket_key(bucket, epoch) "#{PREFIX}|#{bucket}|#{epoch}" end |
Instance Method Details
#sample(now = ::Time.now) ⇒ Object
One sampling pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual “sample now” can drive it directly.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 84 def sample(now = ::Time.now) gauges = queue_gauges return if gauges.empty? fields = gauges.flat_map do |name, (size, lat)| ["#{name}|#{SIZE_KIND}", size, "#{name}|#{LAT_KIND}", lat] end BUCKETS.each do |bucket, (step, ttl)| key = self.class.bucket_key(bucket, (now.to_i / step) * step) redis do |c| c.call('HSET', key, *fields) c.call('EXPIRE', key, ttl) end end nil end |
#start ⇒ Object
55 56 57 58 59 60 61 62 63 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 55 def start @thread ||= safe_thread('queue-metrics') do # rubocop:disable Naming/MemoizedInstanceVariableName wait until @done tick wait end end end |
#terminate ⇒ Object
65 66 67 68 69 70 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 65 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader samples, so N workers don’t each HSET the same buckets every minute.
74 75 76 77 78 79 80 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 74 def tick(now: ::Time.now) return unless leader? sample(now) rescue StandardError => e handle_exception(e, { context: 'queue-metrics' }) end |