Class: Wurk::Metrics::QueueRollup

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/metrics/queue_rollup.rb

Overview

Leader-only background thread that snapshots each queue’s current depth (LLEN) and head-of-line latency into compact per-queue gauge buckets the dashboard’s “queue size / latency over time” charts read directly:

qm|1m|<epoch>   HASH {<queue>|sz, <queue>|lt}   TTL 24h
qm|5m|<epoch>   HASH {<queue>|sz, <queue>|lt}   TTL 7d
qm|1h|<epoch>   HASH {<queue>|sz, <queue>|lt}   TTL 30d

Unlike Metrics::Rollup (which SUMS counters rolled up from a source), size and latency are GAUGES — point-in-time values — so each tick samples “now” and writes it to the current bucket at every resolution. Within a coarse (5m/1h) bucket the per-minute ticks overwrite, so the bucket holds the latest sample in its window (a “last value” downsample, which is the right summary for a gauge). Leader-gated so N workers don’t each sample the same queues every minute. ‘<epoch>` is the UTC start-of-bucket.

Spec: docs/target/sidekiq-ent.md §5.2 (sidekiq.queue.size / sidekiq.queue.latency gauges), §7 Historical tab.

Constant Summary collapse

PREFIX =
'qm'
SIZE_KIND =
'sz'
LAT_KIND =
'lt'
BUCKETS =

Mirror Metrics::Rollup retention so the dashboard’s range selector (24h·1m / 7d·5m / 30d·1h) maps 1:1 to both the throughput and the queue-gauge series.

Wurk::Metrics::Rollup::BUCKETS
DEFAULT_TICK_SECONDS =
60

Constants included from Component

Component::DEFAULT_THREAD_PRIORITY, Component::PROCESS_NONCE

Instance Attribute Summary

Attributes included from Component

#config

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Component

#default_tag, #fire_event, #handle_exception, #hostname, #identity, #leader?, #logger, #mono_ms, #process_nonce, #real_ms, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config) ⇒ QueueRollup

Returns a new instance of QueueRollup.



46
47
48
49
50
51
52
53
# File 'lib/wurk/metrics/queue_rollup.rb', line 46

def initialize(config)
  @config = config
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS
  @thread = nil
end

Class Method Details

.bucket_key(bucket, epoch) ⇒ Object



42
43
44
# File 'lib/wurk/metrics/queue_rollup.rb', line 42

def self.bucket_key(bucket, epoch)
  "#{PREFIX}|#{bucket}|#{epoch}"
end

Instance Method Details

#sample(now = ::Time.now) ⇒ Object

One sampling pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual “sample now” can drive it directly.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/wurk/metrics/queue_rollup.rb', line 84

def sample(now = ::Time.now)
  gauges = queue_gauges
  return if gauges.empty?

  fields = gauges.flat_map do |name, (size, lat)|
    ["#{name}|#{SIZE_KIND}", size, "#{name}|#{LAT_KIND}", lat]
  end
  BUCKETS.each do |bucket, (step, ttl)|
    key = self.class.bucket_key(bucket, (now.to_i / step) * step)
    redis do |c|
      c.call('HSET', key, *fields)
      c.call('EXPIRE', key, ttl)
    end
  end
  nil
end

#startObject



55
56
57
58
59
60
61
62
63
# File 'lib/wurk/metrics/queue_rollup.rb', line 55

def start
  @thread ||= safe_thread('queue-metrics') do # rubocop:disable Naming/MemoizedInstanceVariableName
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



65
66
67
68
69
70
# File 'lib/wurk/metrics/queue_rollup.rb', line 65

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tick(now: ::Time.now) ⇒ Object

Leader-gated: only the elected leader samples, so N workers don’t each HSET the same buckets every minute.



74
75
76
77
78
79
80
# File 'lib/wurk/metrics/queue_rollup.rb', line 74

def tick(now: ::Time.now)
  return unless leader?

  sample(now)
rescue StandardError => e
  handle_exception(e, { context: 'queue-metrics' })
end