Class: Sidekiq::LimitFetch::Global::CapsuleSemaphor

Inherits:
SemaphoreBase
  • Object
show all
Defined in:
lib/sidekiq/limit_fetch/global/semaphore.rb

Overview

Manages capsule-level heartbeat registration and dead-capsule reaping. Each Sidekiq process registers itself and periodically heartbeats to signal liveness. Stale capsules are reaped and their busy slots freed.

Constant Summary

Constants inherited from SemaphoreBase

SemaphoreBase::PREFIX

Instance Method Summary collapse

Constructor Details

#initialize(capsule) ⇒ CapsuleSemaphor

Returns a new instance of CapsuleSemaphor.



85
86
87
88
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 85

def initialize(capsule)
  super
  @prefix = "#{PREFIX}:capsule:#{@capsule_uuid}"
end

Instance Method Details

#heartbeatObject

Set a heartbeat key in Redis and ensure the capsule UUID is in the active set in Redis. Heartbeat takes ~2.5ms on macOS



92
93
94
95
96
97
98
99
100
101
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 92

def heartbeat
  redis do |conn|
    conn.multi do |multi|
      multi.set("#{@prefix}:heartbeat", "1", "ex", LimitFetch.configuration[:heartbeat_period] * 4)
      multi.sadd("#{SemaphoreBase::PREFIX}:capsules", @capsule_uuid)
    end
  end

  reap
end

#listArray<String>

Returns all registered capsule UUIDs.

Returns:

  • (Array<String>)


106
107
108
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 106

def list
  redis { |conn| conn.smembers("#{SemaphoreBase::PREFIX}:capsules") }
end

#list_deadArray<String>

Identifies dead capsules by checking for missing heartbeat keys.

Returns:

  • (Array<String>)


119
120
121
122
123
124
125
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 119

def list_dead
  uuids = list
  return uuids if uuids.empty?
  heartbeat_keys = uuids.map { |uuid| "#{SemaphoreBase::PREFIX}:capsule:#{uuid}:heartbeat" }
  heartbeat_statuses = redis { |conn| conn.mget(*heartbeat_keys) }
  uuids.zip(heartbeat_statuses).filter_map { |uuid, status| uuid if status != "1" }
end

#purge(uuids) ⇒ Object

Removes dead capsules from the registry and cleans up their busy slots across all queues managed by this capsule.

Parameters:

  • uuids (Array<String>)


131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 131

def purge(uuids)
  return if uuids.empty?
  redis do |conn|
    conn.multi do |multi|
      multi.srem("#{SemaphoreBase::PREFIX}:capsules", *uuids)
      uuids.each do |uuid|
        multi.del("#{SemaphoreBase::PREFIX}:capsule:#{uuid}:heartbeat")
        @capsule_meta.queue_set.each do |queue|
          multi.lrem("#{SemaphoreBase::PREFIX}:queue:#{queue}:busy", 0, uuid)
        end
      end
    end
  end
end

#reapObject

Finds and purges dead capsules. Called on every heartbeat.



111
112
113
114
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 111

def reap
  dead_capsules = list_dead
  purge(dead_capsules)
end