Module: Pgbus::Concurrency::Semaphore
- Defined in:
- lib/pgbus/concurrency/semaphore.rb
Class Method Summary collapse
-
.acquire(key, max_value, duration) ⇒ Object
Attempt to acquire a slot in the semaphore for the given key.
-
.current_value(key) ⇒ Object
Check current value for a key.
-
.expire_stale ⇒ Object
Delete semaphores that have expired (safety net for crashed workers).
-
.release(key) ⇒ Object
Release one slot in the semaphore.
Class Method Details
.acquire(key, max_value, duration) ⇒ Object
Attempt to acquire a slot in the semaphore for the given key. Returns :acquired if a slot was available, :blocked if the limit is reached.
9 10 11 12 |
# File 'lib/pgbus/concurrency/semaphore.rb', line 9 def acquire(key, max_value, duration) expires_at = Time.current + duration Pgbus::Semaphore.acquire!(key, max_value, expires_at) end |
.current_value(key) ⇒ Object
Check current value for a key. Useful for testing/monitoring.
32 33 34 |
# File 'lib/pgbus/concurrency/semaphore.rb', line 32 def current_value(key) Pgbus::Semaphore.where(key: key).pick(:value) end |
.expire_stale ⇒ Object
Delete semaphores that have expired (safety net for crashed workers). Returns an array of hashes with expired keys. Uses DELETE … RETURNING for atomicity (no race between pluck and delete).
22 23 24 25 26 27 28 29 |
# File 'lib/pgbus/concurrency/semaphore.rb', line 22 def expire_stale result = Pgbus::Semaphore.connection.exec_query( "DELETE FROM pgbus_semaphores WHERE expires_at < $1 RETURNING key", "Pgbus Semaphore Expire", [Time.current] ) result.rows.map { |row| { "key" => row[0] } } end |