Class: Sidekiq::WorkSet
- Inherits:
-
Object
- Object
- Sidekiq::WorkSet
- Includes:
- Enumerable
- Defined in:
- lib/sidekiq/api.rb
Overview
The WorkSet stores the work being done by this Sidekiq cluster. It tracks the process and thread working on each job.
WARNING WARNING WARNING
This is live data that can change every millisecond. If you call #size => 5 and then expect #each to be called 5 times, you’re going to have a bad time.
works = Sidekiq::WorkSet.new
works.size => 2
works.each do |process_id, thread_id, work|
# process_id is a unique identifier per Sidekiq process
# thread_id is a unique identifier per thread
# work is a `Sidekiq::Work` instance that has the following accessor methods.
# [work.queue, work.run_at, work.payload]
end
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#find_work(jid) ⇒ Sidekiq::Work
(also: #find_work_by_jid)
Find the work which represents a job with the given JID.
-
#size ⇒ Object
Note that #size is only as accurate as Sidekiq’s heartbeat, which happens every 5 seconds.
Instance Method Details
#each(&block) ⇒ Object
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 |
# File 'lib/sidekiq/api.rb', line 1253 def each(&block) results = [] procs = nil all_works = nil Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a.sort all_works = conn.pipelined do |pipeline| procs.each do |key| pipeline.hgetall("#{key}:work") end end end procs.zip(all_works).each do |key, workers| workers.each_pair do |tid, json| results << [key, tid, Sidekiq::Work.new(key, tid, Sidekiq.load_json(json))] unless json.empty? end end results.sort_by { |(_, _, work)| work.run_at }.each(&block) end |
#find_work(jid) ⇒ Sidekiq::Work Also known as: find_work_by_jid
Find the work which represents a job with the given JID. *This is a slow O(n) operation*. Do not use for app logic.
1303 1304 1305 1306 1307 1308 1309 |
# File 'lib/sidekiq/api.rb', line 1303 def find_work(jid) each do |_process_id, _thread_id, work| job = work.job return work if job.jid == jid end nil end |
#size ⇒ Object
Note that #size is only as accurate as Sidekiq’s heartbeat, which happens every 5 seconds. It is NOT real-time.
Not very efficient if you have lots of Sidekiq processes but the alternative is a global counter which can easily get out of sync with crashy processes.
1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 |
# File 'lib/sidekiq/api.rb', line 1282 def size Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a if procs.empty? 0 else conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "busy") end }.sum(&:to_i) end end end |