Class: Sidekiq::Routing::Sweeper

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/routing/sweeper.rb

Overview

Eagerly clears the already-enqueued backlog of a parked class out of its live queue(s) and into the parking queue. Explicit operator action — never automatic on park. For a 100Ks+ backlog, prefer drain-in-place over moving millions of jobs.

Instance Method Summary collapse

Instance Method Details

#call(klass_name, queue: nil, limit: nil, batch_size: nil) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/sidekiq/routing/sweeper.rb', line 10

def call(klass_name, queue: nil, limit: nil, batch_size: nil)
  limit ||= Routing.configuration.batch_limit
  requested_queue = queue.to_s.empty? ? nil : queue
  target_queues = Array(requested_queue || default_queues_for(klass_name))
  moved = 0

  target_queues.each do |source|
    next if source.to_s == Routing.parked_queue

    Sidekiq::Queue.new(source).each do |job|
      break if limit && moved >= limit
      next unless job.display_class == klass_name

      Mover.move(
        job.item, Routing.parked_queue,
        ORIGINAL_QUEUE_KEY => job.item[ORIGINAL_QUEUE_KEY] || source.to_s)
      moved += 1 if job.delete
    end
  end

  Routing.logger.warn(
    "[Routing] swept #{moved} #{klass_name} job(s) into #{Routing.parked_queue}"
  )
  moved
end