Class: Sidekiq::Routing::ParkedProcessor
- Inherits:
-
Object
- Object
- Sidekiq::Routing::ParkedProcessor
- Defined in:
- lib/sidekiq/routing/parked_processor.rb
Overview
Moves parked jobs back to their original queue by rewriting the “queue” field inside the payload (read item -> set queue -> push -> delete). Because the payload’s queue is now the original, a processed parked job that later fails retries to its original queue, NOT the parking queue.
Stamps NO_DIVERT_KEY so the job is not bounced straight back to parked even if the route is still active (recommended order is still: unpark, then process_parked).
Instance Method Summary collapse
Instance Method Details
#call(klass: nil, limit: nil, batch_size: nil) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/sidekiq/routing/parked_processor.rb', line 14 def call(klass: nil, limit: nil, batch_size: nil) limit ||= Routing.configuration.batch_limit fallback = Routing.configuration.process_parked_fallback_queue moved = 0 Sidekiq::Queue.new(Routing.parked_queue).each do |job| break if limit && moved >= limit next if klass && job.display_class != klass target = job.item[ORIGINAL_QUEUE_KEY] unless target target = fallback Routing.logger.warn( "[Routing] #{job.display_class} #{job.jid} had no original queue; processing parked job to #{fallback}" ) end payload = job.item.reject { |key, _| key == ORIGINAL_QUEUE_KEY } Mover.move(payload, target, NO_DIVERT_KEY => true) moved += 1 if job.delete end Routing.logger.warn("[Routing] processed #{moved} parked job(s) from #{Routing.parked_queue}") moved end |