Class: Philiprehberger::Semaphore::Counter
- Inherits:
-
Object
- Object
- Philiprehberger::Semaphore::Counter
- Defined in:
- lib/philiprehberger/semaphore.rb
Overview
Counting semaphore for concurrent access control with timeouts
Instance Method Summary collapse
-
#acquire(weight: 1) { ... } ⇒ Object
Acquire one or more permits, blocking until available.
-
#acquired_count ⇒ Integer
Return the number of permits currently held by acquirers.
-
#available ⇒ Integer
Return the number of currently available permits.
-
#drain ⇒ void
Drain the semaphore: reject new acquisitions and block until all permits are returned.
-
#draining? ⇒ Boolean
Return whether this semaphore is currently draining.
-
#fair? ⇒ Boolean
Return whether this semaphore uses FIFO fairness.
-
#initialize(permits:, fair: false) ⇒ Counter
constructor
Create a new counting semaphore.
-
#permits ⇒ Integer
Return the total number of permits.
-
#release(weight: 1) ⇒ void
Release one or more permits back to the semaphore.
-
#resize(new_permits) ⇒ void
Resize the semaphore to a new total permit count.
-
#try_acquire(timeout:, weight: 1) { ... } ⇒ Object, false
Try to acquire one or more permits within the given timeout.
Constructor Details
#initialize(permits:, fair: false) ⇒ Counter
Create a new counting semaphore
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/philiprehberger/semaphore.rb', line 20 def initialize(permits:, fair: false) raise Error, 'permits must be a positive integer' unless permits.is_a?(Integer) && permits.positive? @permits = permits @available = permits @mutex = Mutex.new @fair = fair @draining = false @drain_condition = ConditionVariable.new if @fair @queue = [] else @condition = ConditionVariable.new end end |
Instance Method Details
#acquire(weight: 1) { ... } ⇒ Object
Acquire one or more permits, blocking until available
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/philiprehberger/semaphore.rb', line 102 def acquire(weight: 1) validate_weight!(weight) @mutex.synchronize do raise Error, 'semaphore is draining' if @draining if @fair cv = ConditionVariable.new @queue.push(cv) loop do raise_if_draining_fair!(cv) break if @queue.first == cv && @available >= weight cv.wait(@mutex) end @queue.shift else loop do raise Error, 'semaphore is draining' if @draining break if @available >= weight @condition.wait(@mutex) end end @available -= weight end if block_given? begin yield ensure release(weight: weight) end else true end end |
#acquired_count ⇒ Integer
Return the number of permits currently held by acquirers
Equivalent to ‘permits - available` but computed under the internal lock for a consistent snapshot. Useful for observability and metrics.
57 58 59 |
# File 'lib/philiprehberger/semaphore.rb', line 57 def acquired_count @mutex.synchronize { @permits - @available } end |
#available ⇒ Integer
Return the number of currently available permits
47 48 49 |
# File 'lib/philiprehberger/semaphore.rb', line 47 def available @mutex.synchronize { @available } end |
#drain ⇒ void
This method returns an undefined value.
Drain the semaphore: reject new acquisitions and block until all permits are returned
Once draining begins, #acquire raises Error and #try_acquire returns false. This method blocks the calling thread until every outstanding permit has been released. After drain completes the semaphore remains in the draining state.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/philiprehberger/semaphore.rb', line 82 def drain @mutex.synchronize do @draining = true # Wake all waiting acquirers so they can observe the draining state and exit if @fair @queue.each(&:signal) else @condition.broadcast end @drain_condition.wait(@mutex) while @available < @permits end end |
#draining? ⇒ Boolean
Return whether this semaphore is currently draining
71 72 73 |
# File 'lib/philiprehberger/semaphore.rb', line 71 def draining? @mutex.synchronize { @draining } end |
#fair? ⇒ Boolean
Return whether this semaphore uses FIFO fairness
64 65 66 |
# File 'lib/philiprehberger/semaphore.rb', line 64 def fair? @fair end |
#permits ⇒ Integer
Return the total number of permits
40 41 42 |
# File 'lib/philiprehberger/semaphore.rb', line 40 def permits @mutex.synchronize { @permits } end |
#release(weight: 1) ⇒ void
This method returns an undefined value.
Release one or more permits back to the semaphore
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/philiprehberger/semaphore.rb', line 215 def release(weight: 1) validate_weight!(weight) @mutex.synchronize do raise Error, 'cannot release more permits than total' if @available + weight > @permits @available += weight if @draining @drain_condition.signal if @available >= @permits elsif @fair @queue.each(&:signal) else weight.times { @condition.signal } end end end |
#resize(new_permits) ⇒ void
This method returns an undefined value.
Resize the semaphore to a new total permit count
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/philiprehberger/semaphore.rb', line 238 def resize(new_permits) raise Error, 'permits must be a positive integer' unless new_permits.is_a?(Integer) && new_permits.positive? @mutex.synchronize do diff = new_permits - @permits @permits = new_permits @available += diff # If permits increased, wake waiters that may now be able to acquire if diff.positive? if @fair @queue.each(&:signal) else diff.times { @condition.signal } end end end end |
#try_acquire(timeout:, weight: 1) { ... } ⇒ Object, false
Try to acquire one or more permits within the given timeout
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/philiprehberger/semaphore.rb', line 146 def try_acquire(timeout:, weight: 1) validate_weight!(weight) deadline = Time.now + timeout acquired = false @mutex.synchronize do return false if @draining if @fair cv = ConditionVariable.new @queue.push(cv) loop do if @draining @queue.delete(cv) break end if @queue.first == cv && @available >= weight @queue.shift acquired = true break end remaining = deadline - Time.now if remaining <= 0 @queue.delete(cv) break end cv.wait(@mutex, remaining) end else loop do break if @draining if @available >= weight acquired = true break end remaining = deadline - Time.now break if remaining <= 0 @condition.wait(@mutex, remaining) end end @available -= weight if acquired end return false unless acquired if block_given? begin yield ensure release(weight: weight) end else true end end |