Class: Sidekiq::LimitFetch::Global::QueueSemaphore
- Inherits:
-
SemaphoreBase
- Object
- SemaphoreBase
- Sidekiq::LimitFetch::Global::QueueSemaphore
- Defined in:
- lib/sidekiq/limit_fetch/global/semaphore.rb
Overview
Manages concurrency semaphore state for an individual queue. Stores global limits, per-process limits, and the busy list in Redis.
Constant Summary
Constants inherited from SemaphoreBase
Instance Method Summary collapse
-
#initialize(capsule, queue_name) ⇒ QueueSemaphore
constructor
A new instance of QueueSemaphore.
-
#limit ⇒ Integer?
Returns the global concurrency limit for this queue.
-
#limit=(value) ⇒ Object
Sets the global concurrency limit for this queue.
-
#process_limit ⇒ Integer?
Returns the per-process concurrency limit for this queue.
-
#process_limit=(value) ⇒ Object
Sets the per-process concurrency limit for this queue.
-
#release ⇒ Object
Releases one busy slot for this capsule on the queue.
Constructor Details
#initialize(capsule, queue_name) ⇒ QueueSemaphore
Returns a new instance of QueueSemaphore.
32 33 34 35 36 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 32 def initialize(capsule, queue_name) super(capsule) queue_name = queue_name.delete_prefix("queue:") @prefix = "#{PREFIX}:queue:#{queue_name}" end |
Instance Method Details
#limit ⇒ Integer?
Returns the global concurrency limit for this queue.
41 42 43 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 41 def limit redis { |conn| conn.get("#{@prefix}:limit") }&.to_i end |
#limit=(value) ⇒ Object
Sets the global concurrency limit for this queue.
48 49 50 51 52 53 54 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 48 def limit=(value) if value redis { |conn| conn.set("#{@prefix}:limit", value) } else redis { |conn| conn.del("#{@prefix}:limit") } end end |
#process_limit ⇒ Integer?
Returns the per-process concurrency limit for this queue.
59 60 61 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 59 def process_limit redis { |conn| conn.get("#{@prefix}:process_limit") }&.to_i end |
#process_limit=(value) ⇒ Object
Sets the per-process concurrency limit for this queue.
66 67 68 69 70 71 72 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 66 def process_limit=(value) if value redis { |conn| conn.set("#{@prefix}:process_limit", value) } else redis { |conn| conn.del("#{@prefix}:process_limit") } end end |
#release ⇒ Object
Releases one busy slot for this capsule on the queue. Called when a job finishes processing (acknowledge) or is requeued.
76 77 78 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 76 def release redis { |conn| conn.lrem("#{@prefix}:busy", 1, @capsule_uuid) } end |