Class: Sidekiq::Routing::Sweeper
- Inherits:
-
Object
- Object
- Sidekiq::Routing::Sweeper
- Defined in:
- lib/sidekiq/routing/sweeper.rb
Overview
Eagerly clears the already-enqueued backlog of a parked class out of its live queue(s) and into the parking queue. Explicit operator action — never automatic on park. For a 100Ks+ backlog, prefer drain-in-place over moving millions of jobs.
Instance Method Summary collapse
Instance Method Details
#call(klass_name, queue: nil, limit: nil, batch_size: nil) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/sidekiq/routing/sweeper.rb', line 10 def call(klass_name, queue: nil, limit: nil, batch_size: nil) limit ||= Routing.configuration.batch_limit requested_queue = queue.to_s.empty? ? nil : queue target_queues = Array(requested_queue || default_queues_for(klass_name)) moved = 0 target_queues.each do |source| next if source.to_s == Routing.parked_queue Sidekiq::Queue.new(source).each do |job| break if limit && moved >= limit next unless job.display_class == klass_name Mover.move( job.item, Routing.parked_queue, ORIGINAL_QUEUE_KEY => job.item[ORIGINAL_QUEUE_KEY] || source.to_s) moved += 1 if job.delete end end Routing.logger.warn( "[Routing] swept #{moved} #{klass_name} job(s) into #{Routing.parked_queue}" ) moved end |