Class: Philiprehberger::Semaphore::Counter
- Inherits:
-
Object
- Object
- Philiprehberger::Semaphore::Counter
- Defined in:
- lib/philiprehberger/semaphore.rb
Overview
Counting semaphore for concurrent access control with timeouts
Instance Method Summary collapse
-
#acquire(weight: 1) { ... } ⇒ Object
Acquire one or more permits, blocking until available.
-
#acquired_count ⇒ Integer
Return the number of permits currently held by acquirers.
-
#available ⇒ Integer
Return the number of currently available permits.
-
#drain ⇒ void
Drain the semaphore: reject new acquisitions and block until all permits are returned.
-
#drained? ⇒ Boolean
Return whether the semaphore has finished draining: it is in the draining state AND every outstanding permit has been released.
-
#draining? ⇒ Boolean
Return whether this semaphore is currently draining.
-
#fair? ⇒ Boolean
Return whether this semaphore uses FIFO fairness.
-
#initialize(permits:, fair: false) ⇒ Counter
constructor
Create a new counting semaphore.
-
#permits ⇒ Integer
Return the total number of permits.
-
#release(weight: 1) ⇒ void
Release one or more permits back to the semaphore.
-
#resize(new_permits) ⇒ void
Resize the semaphore to a new total permit count.
-
#try_acquire(timeout:, weight: 1) { ... } ⇒ Object, false
Try to acquire one or more permits within the given timeout.
Constructor Details
#initialize(permits:, fair: false) ⇒ Counter
Create a new counting semaphore
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/philiprehberger/semaphore.rb', line 20 def initialize(permits:, fair: false) raise Error, 'permits must be a positive integer' unless permits.is_a?(Integer) && permits.positive? @permits = permits @available = permits @mutex = Mutex.new @fair = fair @draining = false @drain_condition = ConditionVariable.new if @fair @queue = [] else @condition = ConditionVariable.new end end |
Instance Method Details
#acquire(weight: 1) { ... } ⇒ Object
Acquire one or more permits, blocking until available
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/philiprehberger/semaphore.rb', line 114 def acquire(weight: 1) validate_weight!(weight) @mutex.synchronize do raise Error, 'semaphore is draining' if @draining if @fair cv = ConditionVariable.new @queue.push(cv) loop do raise_if_draining_fair!(cv) break if @queue.first == cv && @available >= weight cv.wait(@mutex) end @queue.shift else loop do raise Error, 'semaphore is draining' if @draining break if @available >= weight @condition.wait(@mutex) end end @available -= weight end if block_given? begin yield ensure release(weight: weight) end else true end end |
#acquired_count ⇒ Integer
Return the number of permits currently held by acquirers
Equivalent to ‘permits - available` but computed under the internal lock for a consistent snapshot. Useful for observability and metrics.
57 58 59 |
# File 'lib/philiprehberger/semaphore.rb', line 57 def acquired_count @mutex.synchronize { @permits - @available } end |
#available ⇒ Integer
Return the number of currently available permits
47 48 49 |
# File 'lib/philiprehberger/semaphore.rb', line 47 def available @mutex.synchronize { @available } end |
#drain ⇒ void
This method returns an undefined value.
Drain the semaphore: reject new acquisitions and block until all permits are returned
Once draining begins, #acquire raises Error and #try_acquire returns false. This method blocks the calling thread until every outstanding permit has been released. After drain completes the semaphore remains in the draining state.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/philiprehberger/semaphore.rb', line 94 def drain @mutex.synchronize do @draining = true # Wake all waiting acquirers so they can observe the draining state and exit if @fair @queue.each(&:signal) else @condition.broadcast end @drain_condition.wait(@mutex) while @available < @permits end end |
#drained? ⇒ Boolean
Return whether the semaphore has finished draining: it is in the draining state AND every outstanding permit has been released.
Useful for non-blocking polling alongside #drain, or as a clean one-shot condition check in shutdown sequences without inspecting both #draining? and #acquired_count.
83 84 85 |
# File 'lib/philiprehberger/semaphore.rb', line 83 def drained? @mutex.synchronize { @draining && @available == @permits } end |
#draining? ⇒ Boolean
Return whether this semaphore is currently draining
71 72 73 |
# File 'lib/philiprehberger/semaphore.rb', line 71 def draining? @mutex.synchronize { @draining } end |
#fair? ⇒ Boolean
Return whether this semaphore uses FIFO fairness
64 65 66 |
# File 'lib/philiprehberger/semaphore.rb', line 64 def fair? @fair end |
#permits ⇒ Integer
Return the total number of permits
40 41 42 |
# File 'lib/philiprehberger/semaphore.rb', line 40 def permits @mutex.synchronize { @permits } end |
#release(weight: 1) ⇒ void
This method returns an undefined value.
Release one or more permits back to the semaphore
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/philiprehberger/semaphore.rb', line 227 def release(weight: 1) validate_weight!(weight) @mutex.synchronize do raise Error, 'cannot release more permits than total' if @available + weight > @permits @available += weight if @draining @drain_condition.signal if @available >= @permits elsif @fair @queue.each(&:signal) else weight.times { @condition.signal } end end end |
#resize(new_permits) ⇒ void
This method returns an undefined value.
Resize the semaphore to a new total permit count
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/philiprehberger/semaphore.rb', line 250 def resize(new_permits) raise Error, 'permits must be a positive integer' unless new_permits.is_a?(Integer) && new_permits.positive? @mutex.synchronize do diff = new_permits - @permits @permits = new_permits @available += diff # If permits increased, wake waiters that may now be able to acquire if diff.positive? if @fair @queue.each(&:signal) else diff.times { @condition.signal } end end end end |
#try_acquire(timeout:, weight: 1) { ... } ⇒ Object, false
Try to acquire one or more permits within the given timeout
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/philiprehberger/semaphore.rb', line 158 def try_acquire(timeout:, weight: 1) validate_weight!(weight) deadline = Time.now + timeout acquired = false @mutex.synchronize do return false if @draining if @fair cv = ConditionVariable.new @queue.push(cv) loop do if @draining @queue.delete(cv) break end if @queue.first == cv && @available >= weight @queue.shift acquired = true break end remaining = deadline - Time.now if remaining <= 0 @queue.delete(cv) break end cv.wait(@mutex, remaining) end else loop do break if @draining if @available >= weight acquired = true break end remaining = deadline - Time.now break if remaining <= 0 @condition.wait(@mutex, remaining) end end @available -= weight if acquired end return false unless acquired if block_given? begin yield ensure release(weight: weight) end else true end end |