Module: Pgbus::Concurrency::Semaphore
- Defined in:
- lib/pgbus/concurrency/semaphore.rb
Class Method Summary collapse
-
.acquire(key, max_value, duration) ⇒ Object
Attempt to acquire a slot in the semaphore for the given key.
-
.current_value(key) ⇒ Object
Check current value for a key.
-
.expire_stale ⇒ Object
Delete semaphores that have expired (safety net for crashed workers).
-
.release(key) ⇒ Object
Release one slot in the semaphore.
Class Method Details
.acquire(key, max_value, duration) ⇒ Object
Attempt to acquire a slot in the semaphore for the given key. Returns :acquired if a slot was available, :blocked if the limit is reached.
9 10 11 12 |
# File 'lib/pgbus/concurrency/semaphore.rb', line 9 def acquire(key, max_value, duration) expires_at = Time.current + duration Pgbus::Semaphore.acquire!(key, max_value, expires_at) end |
.current_value(key) ⇒ Object
Check current value for a key. Useful for testing/monitoring.
32 33 34 |
# File 'lib/pgbus/concurrency/semaphore.rb', line 32 def current_value(key) Pgbus::Semaphore.where(key: key).pick(:value) end |
.expire_stale ⇒ Object
Delete semaphores that have expired (safety net for crashed workers). Returns an array of hashes with expired keys. Uses DELETE ... RETURNING for atomicity (no race between pluck and delete).
22 23 24 25 26 27 28 29 |
# File 'lib/pgbus/concurrency/semaphore.rb', line 22 def expire_stale result = Pgbus::Semaphore.connection.exec_query( "DELETE FROM pgbus_semaphores WHERE expires_at < $1 RETURNING key", "Pgbus Semaphore Expire", [Time.current] ) result.rows.map { |row| { "key" => row[0] } } end |