Class: Sidekiq::LimitFetch::Global::CapsuleSemaphor
- Inherits:
-
SemaphoreBase
- Object
- SemaphoreBase
- Sidekiq::LimitFetch::Global::CapsuleSemaphor
- Defined in:
- lib/sidekiq/limit_fetch/global/semaphore.rb
Overview
Manages capsule-level heartbeat registration and dead-capsule reaping. Each Sidekiq process registers itself and periodically heartbeats to signal liveness. Stale capsules are reaped and their busy slots freed.
Constant Summary
Constants inherited from SemaphoreBase
Instance Method Summary collapse
-
#heartbeat ⇒ Object
Set a heartbeat key in Redis and ensure the capsule UUID is in the active set in Redis.
-
#initialize(capsule) ⇒ CapsuleSemaphor
constructor
A new instance of CapsuleSemaphor.
-
#list ⇒ Array<String>
Returns all registered capsule UUIDs.
-
#list_dead ⇒ Array<String>
Identifies dead capsules by checking for missing heartbeat keys.
-
#purge(uuids) ⇒ Object
Removes dead capsules from the registry and cleans up their busy slots across all queues managed by this capsule.
-
#reap ⇒ Object
Finds and purges dead capsules.
Constructor Details
#initialize(capsule) ⇒ CapsuleSemaphor
Returns a new instance of CapsuleSemaphor.
85 86 87 88 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 85 def initialize(capsule) super @prefix = "#{PREFIX}:capsule:#{@capsule_uuid}" end |
Instance Method Details
#heartbeat ⇒ Object
Set a heartbeat key in Redis and ensure the capsule UUID is in the active set in Redis. Heartbeat takes ~2.5ms on macOS
92 93 94 95 96 97 98 99 100 101 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 92 def heartbeat redis do |conn| conn.multi do |multi| multi.set("#{@prefix}:heartbeat", "1", "ex", LimitFetch.configuration[:heartbeat_period] * 4) multi.sadd("#{SemaphoreBase::PREFIX}:capsules", @capsule_uuid) end end reap end |
#list ⇒ Array<String>
Returns all registered capsule UUIDs.
106 107 108 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 106 def list redis { |conn| conn.smembers("#{SemaphoreBase::PREFIX}:capsules") } end |
#list_dead ⇒ Array<String>
Identifies dead capsules by checking for missing heartbeat keys.
119 120 121 122 123 124 125 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 119 def list_dead uuids = list return uuids if uuids.empty? heartbeat_keys = uuids.map { |uuid| "#{SemaphoreBase::PREFIX}:capsule:#{uuid}:heartbeat" } heartbeat_statuses = redis { |conn| conn.mget(*heartbeat_keys) } uuids.zip(heartbeat_statuses).filter_map { |uuid, status| uuid if status != "1" } end |
#purge(uuids) ⇒ Object
Removes dead capsules from the registry and cleans up their busy slots across all queues managed by this capsule.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 131 def purge(uuids) return if uuids.empty? redis do |conn| conn.multi do |multi| multi.srem("#{SemaphoreBase::PREFIX}:capsules", *uuids) uuids.each do |uuid| multi.del("#{SemaphoreBase::PREFIX}:capsule:#{uuid}:heartbeat") @capsule_meta.queue_set.each do |queue| multi.lrem("#{SemaphoreBase::PREFIX}:queue:#{queue}:busy", 0, uuid) end end end end end |
#reap ⇒ Object
Finds and purges dead capsules. Called on every heartbeat.
111 112 113 114 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 111 def reap dead_capsules = list_dead purge(dead_capsules) end |