Class: Pgbus::BlockedExecution

Inherits:
BusRecord
  • Object
show all
Defined in:
app/models/pgbus/blocked_execution.rb

Class Method Summary collapse

Class Method Details

.release_next!(concurrency_key) ⇒ Object

Atomic dequeue: DELETE the highest-priority non-expired row with FOR UPDATE SKIP LOCKED. Returns { queue_name:, payload: } or nil.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'app/models/pgbus/blocked_execution.rb', line 12

def self.release_next!(concurrency_key)
  now = Time.current
  result = connection.exec_query(
    <<~SQL,
      DELETE FROM pgbus_blocked_executions
      WHERE id = (
        SELECT id FROM pgbus_blocked_executions
        WHERE concurrency_key = $1
          AND expires_at >= $2
        ORDER BY priority ASC, created_at ASC
        LIMIT 1
        FOR UPDATE SKIP LOCKED
      )
      RETURNING queue_name, payload
    SQL
    "Pgbus Blocked Release",
    [concurrency_key, now]
  )

  row = result.first
  return nil unless row

  payload = row["payload"]
  payload = JSON.parse(payload) if payload.is_a?(String)

  { queue_name: row["queue_name"], payload: payload }
end