Class: Wurk::WorkSet
- Inherits:
-
Object
- Object
- Wurk::WorkSet
- Includes:
- Enumerable
- Defined in:
- lib/wurk/work_set.rb
Overview
Live snapshot of currently-executing jobs across the cluster. Reads ‘<identity>:work` HASH per registered process; each field is a thread id → JSON payload. The data lags reality by up to one heartbeat (10s) since heartbeats `UNLINK` and rewrite the hash atomically.
Wire-compat is sacred — every Redis call matches Sidekiq OSS exactly. Spec: docs/target/sidekiq-free.md §19.7.
Instance Method Summary collapse
-
#each ⇒ Object
Pipelined ‘<identity>:work` HGETALL per known process.
-
#find_work(jid) ⇒ Object
(also: #find_work_by_jid)
O(n) scan for a JID across all in-flight jobs.
-
#initialize(processes_key: Keys::PROCESSES) ⇒ WorkSet
constructor
Optional ‘processes_key:` allows tests to operate on a namespaced SET; production callers always use `Keys::PROCESSES` (wire-compat).
-
#size ⇒ Object
Sum of ‘busy` HASH field across every known identity.
Constructor Details
#initialize(processes_key: Keys::PROCESSES) ⇒ WorkSet
Optional ‘processes_key:` allows tests to operate on a namespaced SET; production callers always use `Keys::PROCESSES` (wire-compat).
16 17 18 |
# File 'lib/wurk/work_set.rb', line 16 def initialize(processes_key: Keys::PROCESSES) @processes_key = processes_key end |
Instance Method Details
#each ⇒ Object
Pipelined ‘<identity>:work` HGETALL per known process. Yields (process_id, thread_id, Work). Result sorted by `run_at` so the oldest in-flight job appears first — dashboards rely on this order.
23 24 25 26 27 |
# File 'lib/wurk/work_set.rb', line 23 def each return enum_for(:each) unless block_given? collect_rows.sort_by { |(_, _, work)| work.run_at }.each { |row| yield(*row) } end |
#find_work(jid) ⇒ Object Also known as: find_work_by_jid
O(n) scan for a JID across all in-flight jobs. Returns nil when no match. Slow — not for app logic. Aliased as ‘find_work_by_jid` for Sidekiq wire-compat.
46 47 48 49 50 51 |
# File 'lib/wurk/work_set.rb', line 46 def find_work(jid) each do |_process_id, _thread_id, work| return work if work.job.jid == jid end nil end |
#size ⇒ Object
Sum of ‘busy` HASH field across every known identity. Lagged by one heartbeat. Pipelined HGET — unbounded by process count but each call is O(1) on the Redis side.
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/wurk/work_set.rb', line 32 def size Wurk.redis do |conn| procs = conn.call('SMEMBERS', @processes_key) next 0 if procs.empty? conn.pipelined do |pipe| procs.each { |key| pipe.call('HGET', key, 'busy') } end.sum(&:to_i) end end |