Class: Wurk::JobSet

Inherits:
SortedSet show all
Defined in:
lib/wurk/job_set.rb

Overview

ZSET-of-jobs view. Reverse-paged iteration so callers see newest-first (highest score, i.e. furthest-out retry/schedule). Mutations use ZREM-by-value when the exact bytes are known and a (score, jid) scan otherwise.

Spec: docs/target/sidekiq-free.md §19.5.

Direct Known Subclasses

DeadSet, RetrySet, ScheduledSet

Constant Summary

Constants inherited from SortedSet

SortedSet::PAGE_SIZE

Instance Attribute Summary

Attributes inherited from SortedSet

#name

Instance Method Summary collapse

Methods inherited from SortedSet

#as_json, #clear, #initialize, #scan, #size

Methods included from API::Fast::SortedSetExt

#scan

Constructor Details

This class inherits a constructor from Wurk::SortedSet

Instance Method Details

#delete_by_jid(score, jid) ⇒ Object Also known as: delete

Scan the score bracket for a jid match, ZREM the exact bytes once found. Returns true on success. Aliased as ‘delete` for Sidekiq wire-compat. Per-row JSON rescue so a single malformed entry can’t shadow a valid match at the same score.



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/wurk/job_set.rb', line 181

def delete_by_jid(score, jid) # rubocop:disable Naming/PredicateMethod
  Wurk.redis do |conn|
    rows = conn.call('ZRANGEBYSCORE', @name, score.to_f, score.to_f)
    rows.each do |raw|
      parsed = begin
        Wurk.load_json(raw)
      rescue ::JSON::ParserError
        nil
      end
      next unless parsed && parsed['jid'] == jid

      return conn.call('ZREM', @name, raw).to_i.positive?
    end
  end
  false
end

#delete_by_value(name, value) ⇒ Object

ZREM by exact bytes. Returns true when ≥1 element was removed. Method name is Sidekiq wire-compat — ‘delete_by_value?` would break the alias.



172
173
174
175
# File 'lib/wurk/job_set.rb', line 172

def delete_by_value(name, value) # rubocop:disable Naming/PredicateMethod
  removed = Wurk.redis { |conn| conn.call('ZREM', name, value) }
  removed.to_i.positive?
end

#eachObject

Newest-first paged ZRANGE. Yields a SortedEntry per row.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/wurk/job_set.rb', line 75

def each
  return enum_for(:each) unless block_given?

  page  = 0
  added = 0
  loop do
    start = page * PAGE_SIZE
    stop  = start + PAGE_SIZE - 1
    slice = Wurk.redis { |conn| conn.call('ZRANGE', @name, start, stop, 'REV', 'WITHSCORES') }
    slice.each do |value, score|
      yield SortedEntry.new(self, score, value)
      added += 1
    end
    break if slice.size < PAGE_SIZE

    page += 1
  end
  added
end

#fetch(score, jid = nil) ⇒ Object

O(score) lookup. ‘score` accepts Time, Numeric, or a Range of either. Returns the matching entries (possibly multiple at the same exact score). When `jid` is set, narrows to the single (score, jid) pair.



145
146
147
148
149
150
151
# File 'lib/wurk/job_set.rb', line 145

def fetch(score, jid = nil)
  results = Wurk.redis { |conn| conn.call('ZRANGEBYSCORE', @name, *range_args(score), 'WITHSCORES') }
  entries = results.map { |value, sc| SortedEntry.new(self, sc, value) }
  return entries unless jid

  entries.select { |e| e.jid == jid }
end

#find_job(jid) ⇒ Object

ZSCAN-based search by jid substring. Returns the first matching entry or nil. O(n) on the ZSET — callers iterating many jids should switch to per-jid hashes or the score-based fetch.



156
157
158
159
160
161
162
# File 'lib/wurk/job_set.rb', line 156

def find_job(jid)
  scan(jid) do |value, score|
    entry = SortedEntry.new(self, score, value)
    return entry if entry.jid == jid
  end
  nil
end

#kill_all(notify_failure: false, ex: nil) ⇒ Object

Moves every job in this set to the dead set. ‘notify_failure: false` because this is a UI-initiated bulk action, not a retry-exhausted event. Returns the count of jobs moved.



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/wurk/job_set.rb', line 128

def kill_all(notify_failure: false, ex: nil)
  count = 0
  dead = DeadSet.new
  until size.zero?
    each do |entry|
      entry.send(:remove_job) do |message|
        dead.kill(Wurk.dump_json(message), notify_failure: notify_failure, ex: ex)
      end
      count += 1
    end
  end
  count
end

#pop_eachObject

ZPOPMIN loop. Each iteration pops the single oldest member (lowest score, e.g. earliest scheduled-at) and yields the raw JSON + score. Used by the scheduled-poller: it enqueues each popped job through the client. Stops when the set is empty.



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/wurk/job_set.rb', line 99

def pop_each
  loop do
    result = Wurk.redis { |conn| conn.call('ZPOPMIN', @name, 1) }
    break if result.nil? || result.empty?

    # Newer redis-client returns nested `[[value, score]]` even with COUNT 1;
    # older `[value, score]`. Normalize both.
    value, score = result.first.is_a?(Array) ? result.first : result
    yield value, score.to_f
  end
end

#remove_job(entry) ⇒ Object

Removes the exact (score, jid)-matching member. Backs SortedEntry#delete when no cached value bytes are present.



166
167
168
# File 'lib/wurk/job_set.rb', line 166

def remove_job(entry)
  delete_by_value(@name, entry.value) || delete_by_jid(entry.score, entry.jid)
end

#retry_allObject

Re-enqueues every job in this set via the client. Lossy on errors mid-iteration; callers expecting transactional behavior should batch the work themselves.



114
115
116
117
118
119
120
121
122
123
# File 'lib/wurk/job_set.rb', line 114

def retry_all
  count = 0
  until size.zero?
    each do |entry|
      entry.retry
      count += 1
    end
  end
  count
end

#schedule(timestamp, message) ⇒ Object

ZADD with NX so re-scheduling the same payload doesn’t reset its score. Mirrors Sidekiq::JobSet#schedule exactly.



70
71
72
# File 'lib/wurk/job_set.rb', line 70

def schedule(timestamp, message)
  Wurk.redis { |conn| conn.call('ZADD', @name, timestamp.to_f.to_s, Wurk.dump_json(message)) }
end