Class: Pgbus::Semaphore

Inherits:
BusRecord
  • Object
show all
Defined in:
app/models/pgbus/semaphore.rb

Class Method Summary collapse

Class Method Details

.acquire!(key, max_value, expires_at) ⇒ Object

Atomic conditional UPSERT. Returns :acquired or :blocked.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'app/models/pgbus/semaphore.rb', line 10

def self.acquire!(key, max_value, expires_at)
  result = connection.exec_query(
    <<~SQL,
      INSERT INTO pgbus_semaphores (key, value, max_value, expires_at)
      VALUES ($1, 1, $2, $3)
      ON CONFLICT (key) DO UPDATE
        SET value = pgbus_semaphores.value + 1,
            max_value = EXCLUDED.max_value,
            expires_at = GREATEST(pgbus_semaphores.expires_at, EXCLUDED.expires_at)
        WHERE pgbus_semaphores.value < EXCLUDED.max_value
      RETURNING value
    SQL
    "Pgbus Semaphore Acquire",
    [key, max_value, expires_at]
  )

  result.any? ? :acquired : :blocked
end