Class: Wurk::WorkSet

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/wurk/work_set.rb

Overview

Live snapshot of currently-executing jobs across the cluster. Reads ‘<identity>:work` HASH per registered process; each field is a thread id → JSON payload. The data lags reality by up to one heartbeat (10s) since heartbeats `UNLINK` and rewrite the hash atomically.

Wire-compat is sacred — every Redis call matches Sidekiq OSS exactly. Spec: docs/target/sidekiq-free.md §19.7.

Instance Method Summary collapse

Constructor Details

#initialize(processes_key: Keys::PROCESSES) ⇒ WorkSet

Optional ‘processes_key:` allows tests to operate on a namespaced SET; production callers always use `Keys::PROCESSES` (wire-compat).



16
17
18
# File 'lib/wurk/work_set.rb', line 16

def initialize(processes_key: Keys::PROCESSES)
  @processes_key = processes_key
end

Instance Method Details

#eachObject

Pipelined ‘<identity>:work` HGETALL per known process. Yields (process_id, thread_id, Work). Result sorted by `run_at` so the oldest in-flight job appears first — dashboards rely on this order.



23
24
25
26
27
# File 'lib/wurk/work_set.rb', line 23

def each
  return enum_for(:each) unless block_given?

  collect_rows.sort_by { |(_, _, work)| work.run_at }.each { |row| yield(*row) }
end

#find_work(jid) ⇒ Object Also known as: find_work_by_jid

O(n) scan for a JID across all in-flight jobs. Returns nil when no match. Slow — not for app logic. Aliased as ‘find_work_by_jid` for Sidekiq wire-compat.



46
47
48
49
50
51
# File 'lib/wurk/work_set.rb', line 46

def find_work(jid)
  each do |_process_id, _thread_id, work|
    return work if work.job.jid == jid
  end
  nil
end

#sizeObject

Sum of ‘busy` HASH field across every known identity. Lagged by one heartbeat. Pipelined HGET — unbounded by process count but each call is O(1) on the Redis side.



32
33
34
35
36
37
38
39
40
41
# File 'lib/wurk/work_set.rb', line 32

def size
  Wurk.redis do |conn|
    procs = conn.call('SMEMBERS', @processes_key)
    next 0 if procs.empty?

    conn.pipelined do |pipe|
      procs.each { |key| pipe.call('HGET', key, 'busy') }
    end.sum(&:to_i)
  end
end