Class: Wurk::Stats
- Inherits:
-
Object
- Object
- Wurk::Stats
- Defined in:
- lib/wurk/stats.rb
Overview
Read-only inspector for cluster state in Redis. The cheap counters are eagerly pipelined at initialize so a single instance can answer many questions without re-querying; the unbounded ones (‘enqueued`, `workers_size`, `queue_summaries`) re-fetch lazily.
Wire-compat is sacred: every key matches the Sidekiq OSS schema exactly. Spec: docs/target/sidekiq-free.md §19.1.
Defined Under Namespace
Classes: History, QueueSummary
Instance Method Summary collapse
- #dead_size ⇒ Object
-
#default_queue_latency ⇒ Object
Latency (secs) of the ‘default` queue — the most-asked-about gauge.
-
#enqueued ⇒ Object
Sum of LLEN across every known queue.
- #expired ⇒ Object
- #failed ⇒ Object
-
#initialize ⇒ Stats
constructor
A new instance of Stats.
- #processed ⇒ Object
- #processes_size ⇒ Object
-
#queue_summaries ⇒ Array<QueueSummary>
One per known queue.
-
#queues ⇒ Hash{String=>Integer}
Queue name → LLEN.
-
#reset(*stats) ⇒ Object
Resets the named global counters.
- #retry_size ⇒ Object
- #scheduled_size ⇒ Object
-
#workers_size ⇒ Object
Sum of the ‘busy` HASH field across every live process identity.
Constructor Details
#initialize ⇒ Stats
Returns a new instance of Stats.
20 21 22 |
# File 'lib/wurk/stats.rb', line 20 def initialize fetch_stats_fast! end |
Instance Method Details
#dead_size ⇒ Object
29 |
# File 'lib/wurk/stats.rb', line 29 def dead_size = @stats.fetch(:dead_size) |
#default_queue_latency ⇒ Object
Latency (secs) of the ‘default` queue — the most-asked-about gauge.
83 84 85 86 87 |
# File 'lib/wurk/stats.rb', line 83 def default_queue_latency now_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) payload = Wurk.redis { |c| c.call('LRANGE', Keys.queue('default'), -1, -1) }.first compute_latency(payload, now_ms) end |
#enqueued ⇒ Object
Sum of LLEN across every known queue. Linear in queue count — the spec labels this “slow” upstream; don’t put it on a hot path.
34 35 36 |
# File 'lib/wurk/stats.rb', line 34 def enqueued queues.each_value.sum end |
#expired ⇒ Object
26 |
# File 'lib/wurk/stats.rb', line 26 def expired = @stats.fetch(:expired) |
#failed ⇒ Object
25 |
# File 'lib/wurk/stats.rb', line 25 def failed = @stats.fetch(:failed) |
#processed ⇒ Object
24 |
# File 'lib/wurk/stats.rb', line 24 def processed = @stats.fetch(:processed) |
#processes_size ⇒ Object
30 |
# File 'lib/wurk/stats.rb', line 30 def processes_size = @stats.fetch(:processes_size) |
#queue_summaries ⇒ Array<QueueSummary>
Returns one per known queue.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/wurk/stats.rb', line 66 def queue_summaries Wurk.redis do |conn| names = conn.call('SMEMBERS', Keys::QUEUES_SET) next [] if names.empty? paused_set = conn.call('SMEMBERS', 'paused') results = conn.pipelined do |pipe| names.each do |q| pipe.call('LLEN', Keys.queue(q)) pipe.call('LRANGE', Keys.queue(q), -1, -1) end end build_summaries(names, results, paused_set) end end |
#queues ⇒ Hash{String=>Integer}
Returns queue name → LLEN.
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/wurk/stats.rb', line 53 def queues Wurk.redis do |conn| names = conn.call('SMEMBERS', Keys::QUEUES_SET) next {} if names.empty? sizes = conn.pipelined do |pipe| names.each { |q| pipe.call('LLEN', Keys.queue(q)) } end names.zip(sizes.map(&:to_i)).to_h end end |
#reset(*stats) ⇒ Object
Resets the named global counters. With no args, clears ‘processed`, `failed`, and `expired`. SET … 0 (not DEL — keeps the key around so reads stay `Integer` not `nil`).
92 93 94 95 96 97 98 99 100 |
# File 'lib/wurk/stats.rb', line 92 def reset(*stats) all = %w[failed processed expired] to_clear = stats.empty? ? all : all & stats.flatten.map(&:to_s) Wurk.redis do |conn| conn.pipelined do |pipe| to_clear.each { |s| pipe.call('SET', "stat:#{s}", 0) } end end end |
#retry_size ⇒ Object
28 |
# File 'lib/wurk/stats.rb', line 28 def retry_size = @stats.fetch(:retry_size) |
#scheduled_size ⇒ Object
27 |
# File 'lib/wurk/stats.rb', line 27 def scheduled_size = @stats.fetch(:scheduled_size) |
#workers_size ⇒ Object
Sum of the ‘busy` HASH field across every live process identity. Pipelined but unbounded by process count.
40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/wurk/stats.rb', line 40 def workers_size Wurk.redis do |conn| identities = conn.call('SMEMBERS', Keys::PROCESSES) next 0 if identities.empty? busy = conn.pipelined do |pipe| identities.each { |id| pipe.call('HGET', id, 'busy') } end busy.sum(&:to_i) end end |