Class: Sidekiq::LimitFetch::Global::QueueSemaphore

Inherits:
SemaphoreBase
  • Object
show all
Defined in:
lib/sidekiq/limit_fetch/global/semaphore.rb

Overview

Manages concurrency semaphore state for an individual queue. Stores global limits, per-process limits, and the busy list in Redis.

Constant Summary

Constants inherited from SemaphoreBase

SemaphoreBase::PREFIX

Instance Method Summary collapse

Constructor Details

#initialize(capsule, queue_name) ⇒ QueueSemaphore

Returns a new instance of QueueSemaphore.



32
33
34
35
36
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 32

def initialize(capsule, queue_name)
  super(capsule)
  queue_name = queue_name.delete_prefix("queue:")
  @prefix    = "#{PREFIX}:queue:#{queue_name}"
end

Instance Method Details

#limitInteger?

Returns the global concurrency limit for this queue.

Returns:

  • (Integer, nil)


41
42
43
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 41

def limit
  redis { |conn| conn.get("#{@prefix}:limit") }&.to_i
end

#limit=(value) ⇒ Object

Sets the global concurrency limit for this queue.

Parameters:

  • value (Integer, nil)


48
49
50
51
52
53
54
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 48

def limit=(value)
  if value
    redis { |conn| conn.set("#{@prefix}:limit", value) }
  else
    redis { |conn| conn.del("#{@prefix}:limit") }
  end
end

#process_limitInteger?

Returns the per-process concurrency limit for this queue.

Returns:

  • (Integer, nil)


59
60
61
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 59

def process_limit
  redis { |conn| conn.get("#{@prefix}:process_limit") }&.to_i
end

#process_limit=(value) ⇒ Object

Sets the per-process concurrency limit for this queue.

Parameters:

  • value (Integer, nil)


66
67
68
69
70
71
72
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 66

def process_limit=(value)
  if value
    redis { |conn| conn.set("#{@prefix}:process_limit", value) }
  else
    redis { |conn| conn.del("#{@prefix}:process_limit") }
  end
end

#releaseObject

Releases one busy slot for this capsule on the queue. Called when a job finishes processing (acknowledge) or is requeued.



76
77
78
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 76

def release
  redis { |conn| conn.lrem("#{@prefix}:busy", 1, @capsule_uuid) }
end