Module: Pgbus::Concurrency::Semaphore

Defined in:
lib/pgbus/concurrency/semaphore.rb

Class Method Summary collapse

Class Method Details

.acquire(key, max_value, duration) ⇒ Object

Attempt to acquire a slot in the semaphore for the given key. Returns :acquired if a slot was available, :blocked if the limit is reached.



9
10
11
12
# File 'lib/pgbus/concurrency/semaphore.rb', line 9

def acquire(key, max_value, duration)
  expires_at = Time.current + duration
  Pgbus::Semaphore.acquire!(key, max_value, expires_at)
end

.current_value(key) ⇒ Object

Check current value for a key. Useful for testing/monitoring.



32
33
34
# File 'lib/pgbus/concurrency/semaphore.rb', line 32

def current_value(key)
  Pgbus::Semaphore.where(key: key).pick(:value)
end

.expire_staleObject

Delete semaphores that have expired (safety net for crashed workers). Returns an array of hashes with expired keys. Uses DELETE ... RETURNING for atomicity (no race between pluck and delete).



22
23
24
25
26
27
28
29
# File 'lib/pgbus/concurrency/semaphore.rb', line 22

def expire_stale
  result = Pgbus::Semaphore.connection.exec_query(
    "DELETE FROM pgbus_semaphores WHERE expires_at < $1 RETURNING key",
    "Pgbus Semaphore Expire",
    [Time.current]
  )
  result.rows.map { |row| { "key" => row[0] } }
end

.release(key) ⇒ Object

Release one slot in the semaphore. Called after a job completes.



15
16
17
# File 'lib/pgbus/concurrency/semaphore.rb', line 15

def release(key)
  Pgbus::Semaphore.where(key: key).update_all("value = GREATEST(value - 1, 0)")
end