Class: Wurk::Stats

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/stats.rb

Overview

Read-only inspector for cluster state in Redis. The cheap counters are eagerly pipelined at initialize so a single instance can answer many questions without re-querying; the unbounded ones (‘enqueued`, `workers_size`, `queue_summaries`) re-fetch lazily.

Wire-compat is sacred: every key matches the Sidekiq OSS schema exactly. Spec: docs/target/sidekiq-free.md §19.1.

Defined Under Namespace

Classes: History, QueueSummary

Instance Method Summary collapse

Constructor Details

#initializeStats

Returns a new instance of Stats.



20
21
22
# File 'lib/wurk/stats.rb', line 20

def initialize
  fetch_stats_fast!
end

Instance Method Details

#dead_sizeObject



29
# File 'lib/wurk/stats.rb', line 29

def dead_size       = @stats.fetch(:dead_size)

#default_queue_latencyObject

Latency (secs) of the ‘default` queue — the most-asked-about gauge.



90
91
92
93
94
# File 'lib/wurk/stats.rb', line 90

def default_queue_latency
  now_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond)
  payload = Wurk.redis { |c| c.call('LRANGE', Keys.queue('default'), -1, -1) }.first
  compute_latency(payload, now_ms)
end

#enqueuedObject

Sum of LLEN across every known queue. Linear in queue count — the spec labels this “slow” upstream; don’t put it on a hot path.



34
35
36
# File 'lib/wurk/stats.rb', line 34

def enqueued
  queues.each_value.sum
end

#expiredObject



26
# File 'lib/wurk/stats.rb', line 26

def expired         = @stats.fetch(:expired)

#failedObject



25
# File 'lib/wurk/stats.rb', line 25

def failed          = @stats.fetch(:failed)

#processedObject



24
# File 'lib/wurk/stats.rb', line 24

def processed       = @stats.fetch(:processed)

#processes_sizeObject



30
# File 'lib/wurk/stats.rb', line 30

def processes_size  = @stats.fetch(:processes_size)

#queue_summariesArray<QueueSummary>

Same size-descending order as Sidekiq’s detailed queue list (‘queue_summaries.sort_by { |qd| -qd.size }`) — this feeds the dashboard’s queue table (api_controller#queues).

Returns:

  • (Array<QueueSummary>)

    one per known queue, largest first.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/wurk/stats.rb', line 73

def queue_summaries
  Wurk.redis do |conn|
    names = conn.call('SMEMBERS', Keys::QUEUES_SET)
    next [] if names.empty?

    paused_set = conn.call('SMEMBERS', 'paused')
    results = conn.pipelined do |pipe|
      names.each do |q|
        pipe.call('LLEN', Keys.queue(q))
        pipe.call('LRANGE', Keys.queue(q), -1, -1)
      end
    end
    build_summaries(names, results, paused_set).sort_by { |qd| -qd.size }
  end
end

#queuesHash{String=>Integer}

‘SMEMBERS` order is arbitrary; Sidekiq sorts the name/size pairs by size descending before building the hash (Stats#queues, spec §19.1) — gems and dashboards reading this rely on that order. Match it exactly with `sort_by { |_, size| -size }`.

Returns:

  • (Hash{String=>Integer})

    queue name → LLEN, largest queue first.



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/wurk/stats.rb', line 57

def queues
  Wurk.redis do |conn|
    names = conn.call('SMEMBERS', Keys::QUEUES_SET)
    next {} if names.empty?

    sizes = conn.pipelined do |pipe|
      names.each { |q| pipe.call('LLEN', Keys.queue(q)) }
    end
    names.zip(sizes.map(&:to_i)).sort_by { |_, size| -size }.to_h
  end
end

#reset(*stats) ⇒ Object

Resets the named global counters. With no args, clears ‘processed`, `failed`, and `expired`. SET … 0 (not DEL — keeps the key around so reads stay `Integer` not `nil`).



99
100
101
102
103
104
105
106
107
# File 'lib/wurk/stats.rb', line 99

def reset(*stats)
  all      = %w[failed processed expired]
  to_clear = stats.empty? ? all : all & stats.flatten.map(&:to_s)
  Wurk.redis do |conn|
    conn.pipelined do |pipe|
      to_clear.each { |s| pipe.call('SET', "stat:#{s}", 0) }
    end
  end
end

#retry_sizeObject



28
# File 'lib/wurk/stats.rb', line 28

def retry_size      = @stats.fetch(:retry_size)

#scheduled_sizeObject



27
# File 'lib/wurk/stats.rb', line 27

def scheduled_size  = @stats.fetch(:scheduled_size)

#workers_sizeObject

Sum of the ‘busy` HASH field across every live process identity. Pipelined but unbounded by process count.



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/wurk/stats.rb', line 40

def workers_size
  Wurk.redis do |conn|
    identities = conn.call('SMEMBERS', Keys::PROCESSES)
    next 0 if identities.empty?

    busy = conn.pipelined do |pipe|
      identities.each { |id| pipe.call('HGET', id, 'busy') }
    end
    busy.sum(&:to_i)
  end
end