Class: Sidekiq::LimitFetch
- Inherits:
-
BasicFetch
- Object
- BasicFetch
- Sidekiq::LimitFetch
- Defined in:
- lib/sidekiq/limit_fetch.rb,
lib/sidekiq/limit_fetch/global.rb,
lib/sidekiq/limit_fetch/heartbeat.rb,
lib/sidekiq/limit_fetch/unit_of_work.rb,
lib/sidekiq/limit_fetch/global/selector.rb,
lib/sidekiq/limit_fetch/global/semaphore.rb
Overview
A Sidekiq fetch strategy that enforces global and per-process concurrency limits on queues. Replaces BasicFetch and uses a Lua script to atomically check limits and pop jobs from Redis.
Defined Under Namespace
Classes: Global, Heartbeat, Shutdown, UnitOfWork
Instance Attribute Summary collapse
-
#capsule ⇒ Object
readonly
Returns the value of attribute capsule.
Class Method Summary collapse
-
.configuration ⇒ Object
Configurable values for Sidekiq::LimitFetch.
- .setup(capsule) ⇒ Object
Instance Method Summary collapse
-
#initialize(capsule) ⇒ LimitFetch
constructor
A new instance of LimitFetch.
- #poll_interval ⇒ Float
-
#retrieve_work ⇒ UnitOfWork?
Lua: * If limits permit us to take a job, try to take a job (RPOP) from the Sidekiq queue * If a job is found: * Add the current process UUID to the internal “busy” list for that queue * Return Sidekiq job string to the client.
- #selector ⇒ Global::Selector
Constructor Details
#initialize(capsule) ⇒ LimitFetch
Returns a new instance of LimitFetch.
69 70 71 72 |
# File 'lib/sidekiq/limit_fetch.rb', line 69 def initialize(capsule) @capsule = capsule super end |
Instance Attribute Details
#capsule ⇒ Object (readonly)
Returns the value of attribute capsule.
66 67 68 |
# File 'lib/sidekiq/limit_fetch.rb', line 66 def capsule @capsule end |
Class Method Details
.configuration ⇒ Object
Configurable values for Sidekiq::LimitFetch.
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/sidekiq/limit_fetch.rb', line 22 def self.configuration @configuration ||= { # Workers will sleep for a random number in this range when no jobs are found. # This means when there is no queue backlog the maximum time to pick up a job will be within this range. # Queues with a backlog will continue to process jobs immediately. poll_range: Range.new(0.400, 0.500), # Time between heartbeats. heartbeat_period: 15, } end |
.setup(capsule) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/sidekiq/limit_fetch.rb', line 42 def self.setup(capsule) = Global.init_capsule(capsule) limits = capsule[:limits] || {} process_limits = capsule[:process_limits] || {} .queue_set.each do |queue_name| queue = Global::QueueSemaphore.new(capsule, queue_name) queue_sym = queue_name.to_sym # Apply process limit if queue.process_limit.nil? queue.process_limit = process_limits[queue_sym] end # Apply global limit if queue.limit.nil? queue.limit = limits[queue_sym] end end LimitFetch::Heartbeat.new(capsule).start end |
Instance Method Details
#poll_interval ⇒ Float
92 93 94 |
# File 'lib/sidekiq/limit_fetch.rb', line 92 def poll_interval Random.rand(self.class.configuration[:poll_range]) end |
#retrieve_work ⇒ UnitOfWork?
Lua:
* If limits permit us to take a job, try to take a job (RPOP) from the Sidekiq queue
* If a job is found:
* Add the current process UUID to the internal "busy" list for that queue
* Return Sidekiq job string to the client
81 82 83 84 85 86 87 88 89 |
# File 'lib/sidekiq/limit_fetch.rb', line 81 def retrieve_work ordered_queues = queues_cmd # BasicFetch method handles randomization or strict ordering job_queue, job_str = selector.limit_fetch(ordered_queues) return UnitOfWork.new(job_queue, job_str, capsule) if job_str Kernel.sleep(poll_interval) nil end |
#selector ⇒ Global::Selector
97 98 99 |
# File 'lib/sidekiq/limit_fetch.rb', line 97 def selector @selector ||= Global::Selector.new(capsule) end |