Class: Wurk::JobSet
Overview
ZSET-of-jobs view. Reverse-paged iteration so callers see newest-first (highest score, i.e. furthest-out retry/schedule). Mutations use ZREM-by-value when the exact bytes are known and a (score, jid) scan otherwise.
Spec: docs/target/sidekiq-free.md §19.5.
Direct Known Subclasses
Constant Summary
Constants inherited from SortedSet
Instance Attribute Summary
Attributes inherited from SortedSet
Instance Method Summary collapse
-
#delete_by_jid(score, jid) ⇒ Object
(also: #delete)
Scan the score bracket for a jid match, ZREM the exact bytes once found.
-
#delete_by_value(name, value) ⇒ Object
ZREM by exact bytes.
-
#each ⇒ Object
Newest-first paged ZRANGE.
-
#fetch(score, jid = nil) ⇒ Object
O(score) lookup.
-
#find_job(jid) ⇒ Object
ZSCAN-based search by jid substring.
-
#kill_all(notify_failure: false, ex: nil) ⇒ Object
Moves every job in this set to the dead set.
-
#pop_each ⇒ Object
ZPOPMIN loop.
-
#remove_job(entry) ⇒ Object
Removes the exact (score, jid)-matching member.
-
#retry_all ⇒ Object
Re-enqueues every job in this set via the client.
-
#schedule(timestamp, message) ⇒ Object
ZADD with NX so re-scheduling the same payload doesn’t reset its score.
Methods inherited from SortedSet
#as_json, #clear, #initialize, #scan, #size
Methods included from API::Fast::SortedSetExt
Constructor Details
This class inherits a constructor from Wurk::SortedSet
Instance Method Details
#delete_by_jid(score, jid) ⇒ Object Also known as: delete
Scan the score bracket for a jid match, ZREM the exact bytes once found. Returns true on success. Aliased as ‘delete` for Sidekiq wire-compat. Per-row JSON rescue so a single malformed entry can’t shadow a valid match at the same score.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/wurk/job_set.rb', line 181 def delete_by_jid(score, jid) # rubocop:disable Naming/PredicateMethod Wurk.redis do |conn| rows = conn.call('ZRANGEBYSCORE', @name, score.to_f, score.to_f) rows.each do |raw| parsed = begin Wurk.load_json(raw) rescue ::JSON::ParserError nil end next unless parsed && parsed['jid'] == jid return conn.call('ZREM', @name, raw).to_i.positive? end end false end |
#delete_by_value(name, value) ⇒ Object
ZREM by exact bytes. Returns true when ≥1 element was removed. Method name is Sidekiq wire-compat — ‘delete_by_value?` would break the alias.
172 173 174 175 |
# File 'lib/wurk/job_set.rb', line 172 def delete_by_value(name, value) # rubocop:disable Naming/PredicateMethod removed = Wurk.redis { |conn| conn.call('ZREM', name, value) } removed.to_i.positive? end |
#each ⇒ Object
Newest-first paged ZRANGE. Yields a SortedEntry per row.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/wurk/job_set.rb', line 75 def each return enum_for(:each) unless block_given? page = 0 added = 0 loop do start = page * PAGE_SIZE stop = start + PAGE_SIZE - 1 slice = Wurk.redis { |conn| conn.call('ZRANGE', @name, start, stop, 'REV', 'WITHSCORES') } slice.each do |value, score| yield SortedEntry.new(self, score, value) added += 1 end break if slice.size < PAGE_SIZE page += 1 end added end |
#fetch(score, jid = nil) ⇒ Object
O(score) lookup. ‘score` accepts Time, Numeric, or a Range of either. Returns the matching entries (possibly multiple at the same exact score). When `jid` is set, narrows to the single (score, jid) pair.
145 146 147 148 149 150 151 |
# File 'lib/wurk/job_set.rb', line 145 def fetch(score, jid = nil) results = Wurk.redis { |conn| conn.call('ZRANGEBYSCORE', @name, *range_args(score), 'WITHSCORES') } entries = results.map { |value, sc| SortedEntry.new(self, sc, value) } return entries unless jid entries.select { |e| e.jid == jid } end |
#find_job(jid) ⇒ Object
ZSCAN-based search by jid substring. Returns the first matching entry or nil. O(n) on the ZSET — callers iterating many jids should switch to per-jid hashes or the score-based fetch.
156 157 158 159 160 161 162 |
# File 'lib/wurk/job_set.rb', line 156 def find_job(jid) scan(jid) do |value, score| entry = SortedEntry.new(self, score, value) return entry if entry.jid == jid end nil end |
#kill_all(notify_failure: false, ex: nil) ⇒ Object
Moves every job in this set to the dead set. ‘notify_failure: false` because this is a UI-initiated bulk action, not a retry-exhausted event. Returns the count of jobs moved.
128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/wurk/job_set.rb', line 128 def kill_all(notify_failure: false, ex: nil) count = 0 dead = DeadSet.new until size.zero? each do |entry| entry.send(:remove_job) do || dead.kill(Wurk.dump_json(), notify_failure: notify_failure, ex: ex) end count += 1 end end count end |
#pop_each ⇒ Object
ZPOPMIN loop. Each iteration pops the single oldest member (lowest score, e.g. earliest scheduled-at) and yields the raw JSON + score. Used by the scheduled-poller: it enqueues each popped job through the client. Stops when the set is empty.
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/wurk/job_set.rb', line 99 def pop_each loop do result = Wurk.redis { |conn| conn.call('ZPOPMIN', @name, 1) } break if result.nil? || result.empty? # Newer redis-client returns nested `[[value, score]]` even with COUNT 1; # older `[value, score]`. Normalize both. value, score = result.first.is_a?(Array) ? result.first : result yield value, score.to_f end end |
#remove_job(entry) ⇒ Object
Removes the exact (score, jid)-matching member. Backs SortedEntry#delete when no cached value bytes are present.
166 167 168 |
# File 'lib/wurk/job_set.rb', line 166 def remove_job(entry) delete_by_value(@name, entry.value) || delete_by_jid(entry.score, entry.jid) end |
#retry_all ⇒ Object
Re-enqueues every job in this set via the client. Lossy on errors mid-iteration; callers expecting transactional behavior should batch the work themselves.
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/wurk/job_set.rb', line 114 def retry_all count = 0 until size.zero? each do |entry| entry.retry count += 1 end end count end |