Class: Wurk::DeadSet

Inherits:
JobSet show all
Defined in:
lib/wurk/dead_set.rb

Overview

Capped ZSET of jobs that exhausted retries (the “morgue”). Bounded by ‘dead_max_jobs` and `dead_timeout_in_seconds` config knobs — every `kill` trims both axes. Death handlers fire on retry-exhausted kills (notify_failure: true), not on user-initiated UI kills.

Spec: docs/target/sidekiq-free.md §19.5, §17.2, §31.8.

Constant Summary

Constants inherited from SortedSet

SortedSet::PAGE_SIZE

Instance Attribute Summary

Attributes inherited from SortedSet

#name

Instance Method Summary collapse

Methods inherited from JobSet

#delete_by_jid, #delete_by_value, #each, #fetch, #find_job, #kill_all, #pop_each, #remove_job, #retry_all, #schedule

Methods inherited from SortedSet

#as_json, #clear, #scan, #size

Methods included from API::Fast::SortedSetExt

#scan

Constructor Details

#initialize(name = 'dead') ⇒ DeadSet

Optional ‘name` allows tests to operate on a namespaced ZSET; production callers always use the default `’dead’‘ key (wire-compat with Sidekiq).



15
16
17
# File 'lib/wurk/dead_set.rb', line 15

def initialize(name = 'dead')
  super
end

Instance Method Details

#kill(message, opts = {}) ⇒ Object

ZADD the raw JSON payload, trim, fire death handlers. ‘notify_failure: true` (default) routes the kill through the death-handler chain; UI-initiated kills pass false. `ex` is the originating exception (or synthesized RuntimeError when callers don’t have one) — death handlers receive ‘(job, ex)`. `max_jobs:` / `timeout:` propagate to the auto-trim; see `#trim` for the rationale.



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/wurk/dead_set.rb', line 48

def kill(message, opts = {}) # rubocop:disable Naming/PredicateMethod
  notify = opts.fetch(:notify_failure, true)
  do_trim = opts.fetch(:trim, true)
  ex = opts[:ex] || RuntimeError.new('Job killed')

  now = ::Process.clock_gettime(::Process::CLOCK_REALTIME)
  Wurk.redis { |conn| conn.call('ZADD', @name, now.to_s, message) }
  trim(max_jobs: opts[:max_jobs], timeout: opts[:timeout]) if do_trim
  fire_death_handlers(message, ex) if notify
  true
end

#trim(max_jobs: nil, timeout: nil) ⇒ Object

Two-axis trim: ‘ZREMRANGEBYSCORE` evicts entries older than `dead_timeout_in_seconds`, `ZREMRANGEBYRANK 0 -dead_max_jobs` keeps the count bounded. Pipelined — partial failure leaves at most one axis applied (acceptable; trim is non-critical, runs again next kill).

‘max_jobs:` / `timeout:` override the global config for this call. Lets parallel tests run trim with isolated limits without mutating `Wurk.configuration` (which is process-global and races across threads).



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/wurk/dead_set.rb', line 27

def trim(max_jobs: nil, timeout: nil) # rubocop:disable Naming/PredicateMethod
  config = Wurk.configuration
  max_jobs ||= config[:dead_max_jobs] || 10_000
  timeout ||= config[:dead_timeout_in_seconds] || (180 * 24 * 60 * 60)
  cutoff = ::Process.clock_gettime(::Process::CLOCK_REALTIME) - timeout

  Wurk.redis do |conn|
    conn.pipelined do |pipe|
      pipe.call('ZREMRANGEBYSCORE', @name, '-inf', "(#{cutoff}")
      pipe.call('ZREMRANGEBYRANK', @name, 0, -(max_jobs + 1))
    end
  end
  true
end