Class: Sidekiq::WorkSet
- Inherits:
-
Object
- Object
- Sidekiq::WorkSet
- Includes:
- Enumerable
- Defined in:
- lib/sidekiq/api.rb
Overview
The WorkSet stores the work being done by this Sidekiq cluster. It tracks the process and thread working on each job.
WARNING WARNING WARNING
This is live data that can change every millisecond. If you call #size => 5 and then expect #each to be called 5 times, you’re going to have a bad time.
works = Sidekiq::WorkSet.new
works.size => 2
works.each do |process_id, thread_id, work|
# process_id is a unique identifier per Sidekiq process
# thread_id is a unique identifier per thread
# work is a `Sidekiq::Work` instance that has the following accessor methods.
# [work.queue, work.run_at, work.payload]
end
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#find_work(jid) ⇒ Sidekiq::Work
(also: #find_work_by_jid)
Find the work which represents a job with the given JID.
-
#size ⇒ Object
Note that #size is only as accurate as Sidekiq’s heartbeat, which happens every 5 seconds.
Instance Method Details
#each(&block) ⇒ Object
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 |
# File 'lib/sidekiq/api.rb', line 1257 def each(&block) results = [] procs = nil all_works = nil Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a.sort all_works = conn.pipelined do |pipeline| procs.each do |key| pipeline.hgetall("#{key}:work") end end end procs.zip(all_works).each do |key, workers| workers.each_pair do |tid, json| results << [key, tid, Sidekiq::Work.new(key, tid, Sidekiq.load_json(json))] unless json.empty? end end results.sort_by { |(_, _, work)| work.run_at }.each(&block) end |
#find_work(jid) ⇒ Sidekiq::Work Also known as: find_work_by_jid
Find the work which represents a job with the given JID. *This is a slow O(n) operation*. Do not use for app logic.
1307 1308 1309 1310 1311 1312 1313 |
# File 'lib/sidekiq/api.rb', line 1307 def find_work(jid) each do |_process_id, _thread_id, work| job = work.job return work if job.jid == jid end nil end |
#size ⇒ Object
Note that #size is only as accurate as Sidekiq’s heartbeat, which happens every 5 seconds. It is NOT real-time.
Not very efficient if you have lots of Sidekiq processes but the alternative is a global counter which can easily get out of sync with crashy processes.
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 |
# File 'lib/sidekiq/api.rb', line 1286 def size Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a if procs.empty? 0 else conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "busy") end }.sum(&:to_i) end end end |