Class: Wurk::ProcessSet
- Inherits:
-
Object
- Object
- Wurk::ProcessSet
- Includes:
- Enumerable
- Defined in:
- lib/wurk/process_set.rb
Overview
Live process list view backed by the ‘processes` SET + per-identity hashes. Cluster topology lives here: dashboard process list, total concurrency gauge, RSS roll-up, leader lookup.
Wire-compat is sacred — every Redis call matches Sidekiq OSS exactly. Spec: docs/target/sidekiq-free.md §1 (Redis schema), §19.6.
Class Method Summary collapse
-
.[](identity) ⇒ Object
Fetch a single Process by identity.
Instance Method Summary collapse
-
#cleanup ⇒ Object
SREM identities whose ‘info` HASH field has expired.
-
#each ⇒ Object
Pipelined identity → HMGET.
-
#initialize(clean_plz = true) ⇒ ProcessSet
constructor
‘processes` SET only stores identity strings; the live hash at each identity expires every 60s, so SCARD can lag reality.
-
#leader ⇒ Object
Cluster leader identity.
-
#size ⇒ Object
SCARD over ‘processes`.
-
#total_concurrency ⇒ Object
Sum of ‘concurrency` across live processes.
-
#total_rss_in_kb ⇒ Object
(also: #total_rss)
Sum of ‘rss` (KB) across live processes.
Constructor Details
#initialize(clean_plz = true) ⇒ ProcessSet
‘processes` SET only stores identity strings; the live hash at each identity expires every 60s, so SCARD can lag reality. Constructor opts into a `cleanup` (rate-limited globally to 1/min) so the size reported below reflects the post-prune state — except when caller explicitly skips it (e.g. inside `cleanup` itself, or for snapshot reads on hot paths).
Positional Boolean matches Sidekiq’s public API — wire-compat sacred.
27 28 29 |
# File 'lib/wurk/process_set.rb', line 27 def initialize(clean_plz = true) # rubocop:disable Style/OptionalBooleanParameter cleanup if clean_plz end |
Class Method Details
.[](identity) ⇒ Object
Fetch a single Process by identity. Pipelined SISMEMBER + HMGET so absence (process never registered) and expiry (heartbeat lapsed, info field gone) both return nil.
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/wurk/process_set.rb', line 34 def self.[](identity) exists, fields = Wurk.redis do |conn| conn.pipelined do |pipe| pipe.call('SISMEMBER', Keys::PROCESSES, identity) pipe.call('HMGET', identity, *LOOKUP_FIELDS) end end return nil if exists.to_i.zero? || fields.first.nil? build_process(LOOKUP_FIELDS.zip(fields).to_h) end |
Instance Method Details
#cleanup ⇒ Object
SREM identities whose ‘info` HASH field has expired. Rate-limited globally to 1/min via SET NX EX so concurrent dashboards / API calls don’t dogpile the prune. Returns the number of identities removed (or 0 when the lock was held by someone else).
Spec: docs/target/sidekiq-free.md §31.17.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/wurk/process_set.rb', line 52 def cleanup return 0 unless acquired_cleanup_lock? Wurk.redis do |conn| procs = conn.call('SMEMBERS', Keys::PROCESSES) next 0 if procs.empty? heartbeats = conn.pipelined do |pipe| procs.each { |key| pipe.call('HGET', key, 'info') } end to_prune = procs.zip(heartbeats).filter_map { |id, info| id if info.nil? } next 0 if to_prune.empty? conn.call('SREM', Keys::PROCESSES, *to_prune).to_i end end |
#each ⇒ Object
Pipelined identity → HMGET. Skips identities whose ‘info` field is gone (heartbeat expired between SMEMBERS and HMGET). Sorted by identity so iteration order is stable for dashboards.
72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/wurk/process_set.rb', line 72 def each return enum_for(:each) unless block_given? rows = fetch_each_rows rows.each do |row| attrs = EACH_FIELDS.zip(row).to_h next if attrs['info'].nil? yield ProcessSet.send(:build_process, attrs) end end |
#leader ⇒ Object
Cluster leader identity. Ent-only feature; OSS only reads the key. Memoized per-instance: dashboards re-instantiate ProcessSet per render. ‘||=` with empty-string fallback distinguishes “leader is unset” from “memoization not yet computed”.
106 107 108 |
# File 'lib/wurk/process_set.rb', line 106 def leader @leader ||= Wurk.redis { |c| c.call('GET', 'dear-leader') } || '' end |
#size ⇒ Object
SCARD over ‘processes`. Not pruned — may include identities whose heartbeat has lapsed. Use `each` for the accurate count.
86 87 88 |
# File 'lib/wurk/process_set.rb', line 86 def size Wurk.redis { |conn| conn.call('SCARD', Keys::PROCESSES) } end |
#total_concurrency ⇒ Object
Sum of ‘concurrency` across live processes. Iterates `each` so dead identities are skipped.
92 93 94 |
# File 'lib/wurk/process_set.rb', line 92 def total_concurrency sum { |p| p['concurrency'].to_i } end |
#total_rss_in_kb ⇒ Object Also known as: total_rss
Sum of ‘rss` (KB) across live processes.
97 98 99 |
# File 'lib/wurk/process_set.rb', line 97 def total_rss_in_kb sum { |p| p['rss'].to_i } end |