Module: Pgbus::Concurrency::Semaphore

Defined in:
lib/pgbus/concurrency/semaphore.rb

Class Method Summary collapse

Class Method Details

.acquire(key, max_value, duration) ⇒ Object

Attempt to acquire a slot in the semaphore for the given key. Returns :acquired if a slot was available, :blocked if the limit is reached.



9
10
11
12
# File 'lib/pgbus/concurrency/semaphore.rb', line 9

def acquire(key, max_value, duration)
  expires_at = Time.current + duration
  Pgbus::Semaphore.acquire!(key, max_value, expires_at)
end

.current_value(key) ⇒ Object

Check current value for a key. Useful for testing/monitoring.



32
33
34
# File 'lib/pgbus/concurrency/semaphore.rb', line 32

def current_value(key)
  Pgbus::Semaphore.where(key: key).pick(:value)
end

.expire_staleObject

Delete semaphores that have expired (safety net for crashed workers). Returns an array of hashes with expired keys. Uses DELETE … RETURNING for atomicity (no race between pluck and delete).



22
23
24
25
26
27
28
29
# File 'lib/pgbus/concurrency/semaphore.rb', line 22

def expire_stale
  result = Pgbus::Semaphore.connection.exec_query(
    "DELETE FROM pgbus_semaphores WHERE expires_at < $1 RETURNING key",
    "Pgbus Semaphore Expire",
    [Time.current]
  )
  result.rows.map { |row| { "key" => row[0] } }
end

.release(key) ⇒ Object

Release one slot in the semaphore. Called after a job completes.



15
16
17
# File 'lib/pgbus/concurrency/semaphore.rb', line 15

def release(key)
  Pgbus::Semaphore.where(key: key).update_all("value = GREATEST(value - 1, 0)")
end