Class: Wurk::ProcessSet

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/wurk/process_set.rb

Overview

Live process list view backed by the ‘processes` SET + per-identity hashes. Cluster topology lives here: dashboard process list, total concurrency gauge, RSS roll-up, leader lookup.

Wire-compat is sacred — every Redis call matches Sidekiq OSS exactly. Spec: docs/target/sidekiq-free.md §1 (Redis schema), §19.6.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(clean_plz = true) ⇒ ProcessSet

‘processes` SET only stores identity strings; the live hash at each identity expires every 60s, so SCARD can lag reality. Constructor opts into a `cleanup` (rate-limited globally to 1/min) so the size reported below reflects the post-prune state — except when caller explicitly skips it (e.g. inside `cleanup` itself, or for snapshot reads on hot paths).

Positional Boolean matches Sidekiq’s public API — wire-compat sacred.



27
28
29
# File 'lib/wurk/process_set.rb', line 27

def initialize(clean_plz = true) # rubocop:disable Style/OptionalBooleanParameter
  cleanup if clean_plz
end

Class Method Details

.[](identity) ⇒ Object

Fetch a single Process by identity. Pipelined SISMEMBER + HMGET so absence (process never registered) and expiry (heartbeat lapsed, info field gone) both return nil.



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/wurk/process_set.rb', line 34

def self.[](identity)
  exists, fields = Wurk.redis do |conn|
    conn.pipelined do |pipe|
      pipe.call('SISMEMBER', Keys::PROCESSES, identity)
      pipe.call('HMGET', identity, *LOOKUP_FIELDS)
    end
  end
  return nil if exists.to_i.zero? || fields.first.nil?

  build_process(LOOKUP_FIELDS.zip(fields).to_h)
end

Instance Method Details

#cleanupObject

SREM identities whose ‘info` HASH field has expired. Rate-limited globally to 1/min via SET NX EX so concurrent dashboards / API calls don’t dogpile the prune. Returns the number of identities removed (or 0 when the lock was held by someone else).

Spec: docs/target/sidekiq-free.md §31.17.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/wurk/process_set.rb', line 52

def cleanup
  return 0 unless acquired_cleanup_lock?

  Wurk.redis do |conn|
    procs = conn.call('SMEMBERS', Keys::PROCESSES)
    next 0 if procs.empty?

    heartbeats = conn.pipelined do |pipe|
      procs.each { |key| pipe.call('HGET', key, 'info') }
    end
    to_prune = procs.zip(heartbeats).filter_map { |id, info| id if info.nil? }
    next 0 if to_prune.empty?

    conn.call('SREM', Keys::PROCESSES, *to_prune).to_i
  end
end

#eachObject

Pipelined identity → HMGET. Skips identities whose ‘info` field is gone (heartbeat expired between SMEMBERS and HMGET). Sorted by identity so iteration order is stable for dashboards.



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/wurk/process_set.rb', line 72

def each
  return enum_for(:each) unless block_given?

  rows = fetch_each_rows
  rows.each do |row|
    attrs = EACH_FIELDS.zip(row).to_h
    next if attrs['info'].nil?

    yield ProcessSet.send(:build_process, attrs)
  end
end

#leaderObject

Cluster leader identity. Ent-only feature; OSS only reads the key. Memoized per-instance: dashboards re-instantiate ProcessSet per render. ‘||=` with empty-string fallback distinguishes “leader is unset” from “memoization not yet computed”.



106
107
108
# File 'lib/wurk/process_set.rb', line 106

def leader
  @leader ||= Wurk.redis { |c| c.call('GET', 'dear-leader') } || ''
end

#sizeObject

SCARD over ‘processes`. Not pruned — may include identities whose heartbeat has lapsed. Use `each` for the accurate count.



86
87
88
# File 'lib/wurk/process_set.rb', line 86

def size
  Wurk.redis { |conn| conn.call('SCARD', Keys::PROCESSES) }
end

#total_concurrencyObject

Sum of ‘concurrency` across live processes. Iterates `each` so dead identities are skipped.



92
93
94
# File 'lib/wurk/process_set.rb', line 92

def total_concurrency
  sum { |p| p['concurrency'].to_i }
end

#total_rss_in_kbObject Also known as: total_rss

Sum of ‘rss` (KB) across live processes.



97
98
99
# File 'lib/wurk/process_set.rb', line 97

def total_rss_in_kb
  sum { |p| p['rss'].to_i }
end