Class: Wurk::Stats

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/stats.rb

Overview

Read-only inspector for cluster state in Redis. The cheap counters are eagerly pipelined at initialize so a single instance can answer many questions without re-querying; the unbounded ones (‘enqueued`, `workers_size`, `queue_summaries`) re-fetch lazily.

Wire-compat is sacred: every key matches the Sidekiq OSS schema exactly. Spec: docs/target/sidekiq-free.md §19.1.

Defined Under Namespace

Classes: History, QueueSummary

Instance Method Summary collapse

Constructor Details

#initializeStats

Returns a new instance of Stats.



20
21
22
# File 'lib/wurk/stats.rb', line 20

def initialize
  fetch_stats_fast!
end

Instance Method Details

#dead_sizeObject



29
# File 'lib/wurk/stats.rb', line 29

def dead_size       = @stats.fetch(:dead_size)

#default_queue_latencyObject

Latency (secs) of the ‘default` queue — the most-asked-about gauge.



83
84
85
86
87
# File 'lib/wurk/stats.rb', line 83

def default_queue_latency
  now_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond)
  payload = Wurk.redis { |c| c.call('LRANGE', Keys.queue('default'), -1, -1) }.first
  compute_latency(payload, now_ms)
end

#enqueuedObject

Sum of LLEN across every known queue. Linear in queue count — the spec labels this “slow” upstream; don’t put it on a hot path.



34
35
36
# File 'lib/wurk/stats.rb', line 34

def enqueued
  queues.each_value.sum
end

#expiredObject



26
# File 'lib/wurk/stats.rb', line 26

def expired         = @stats.fetch(:expired)

#failedObject



25
# File 'lib/wurk/stats.rb', line 25

def failed          = @stats.fetch(:failed)

#processedObject



24
# File 'lib/wurk/stats.rb', line 24

def processed       = @stats.fetch(:processed)

#processes_sizeObject



30
# File 'lib/wurk/stats.rb', line 30

def processes_size  = @stats.fetch(:processes_size)

#queue_summariesArray<QueueSummary>

Returns one per known queue.

Returns:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/wurk/stats.rb', line 66

def queue_summaries
  Wurk.redis do |conn|
    names = conn.call('SMEMBERS', Keys::QUEUES_SET)
    next [] if names.empty?

    paused_set = conn.call('SMEMBERS', 'paused')
    results = conn.pipelined do |pipe|
      names.each do |q|
        pipe.call('LLEN', Keys.queue(q))
        pipe.call('LRANGE', Keys.queue(q), -1, -1)
      end
    end
    build_summaries(names, results, paused_set)
  end
end

#queuesHash{String=>Integer}

Returns queue name → LLEN.

Returns:

  • (Hash{String=>Integer})

    queue name → LLEN.



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/wurk/stats.rb', line 53

def queues
  Wurk.redis do |conn|
    names = conn.call('SMEMBERS', Keys::QUEUES_SET)
    next {} if names.empty?

    sizes = conn.pipelined do |pipe|
      names.each { |q| pipe.call('LLEN', Keys.queue(q)) }
    end
    names.zip(sizes.map(&:to_i)).to_h
  end
end

#reset(*stats) ⇒ Object

Resets the named global counters. With no args, clears ‘processed`, `failed`, and `expired`. SET … 0 (not DEL — keeps the key around so reads stay `Integer` not `nil`).



92
93
94
95
96
97
98
99
100
# File 'lib/wurk/stats.rb', line 92

def reset(*stats)
  all      = %w[failed processed expired]
  to_clear = stats.empty? ? all : all & stats.flatten.map(&:to_s)
  Wurk.redis do |conn|
    conn.pipelined do |pipe|
      to_clear.each { |s| pipe.call('SET', "stat:#{s}", 0) }
    end
  end
end

#retry_sizeObject



28
# File 'lib/wurk/stats.rb', line 28

def retry_size      = @stats.fetch(:retry_size)

#scheduled_sizeObject



27
# File 'lib/wurk/stats.rb', line 27

def scheduled_size  = @stats.fetch(:scheduled_size)

#workers_sizeObject

Sum of the ‘busy` HASH field across every live process identity. Pipelined but unbounded by process count.



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/wurk/stats.rb', line 40

def workers_size
  Wurk.redis do |conn|
    identities = conn.call('SMEMBERS', Keys::PROCESSES)
    next 0 if identities.empty?

    busy = conn.pipelined do |pipe|
      identities.each { |id| pipe.call('HGET', id, 'busy') }
    end
    busy.sum(&:to_i)
  end
end