Class: Pgbus::Process::QueueLock

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/process/queue_lock.rb

Overview

Manages PostgreSQL advisory locks for single-active-consumer mode. Only one worker process can hold the lock for a given queue at a time. Other workers skip the queue and process other queues instead.

Uses pg_try_advisory_lock (non-blocking) so workers never wait —they simply skip queues they can’t lock and try again next cycle.

Locks are session-level and automatically released when the connection closes (including on crash), so no manual cleanup is needed.

Constant Summary collapse

LOCK_NAMESPACE =

Use a fixed namespace to avoid collision with application advisory locks. CRC32 of “pgbus_queue_lock” = 0x5067_6275

0x5067_6275

Instance Method Summary collapse

Constructor Details

#initializeQueueLock

Returns a new instance of QueueLock.



21
22
23
# File 'lib/pgbus/process/queue_lock.rb', line 21

def initialize
  @held_locks = Concurrent::Map.new
end

Instance Method Details

#held_queuesObject



67
68
69
# File 'lib/pgbus/process/queue_lock.rb', line 67

def held_queues
  @held_locks.keys
end

#locked?(queue_name) ⇒ Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/pgbus/process/queue_lock.rb', line 63

def locked?(queue_name)
  @held_locks.key?(queue_name)
end

#try_lock(queue_name) ⇒ Object

Try to acquire an advisory lock for the given queue name. Returns true if acquired (or already held), false if another process holds it.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pgbus/process/queue_lock.rb', line 27

def try_lock(queue_name)
  return true if @held_locks[queue_name]

  lock_id = lock_id_for(queue_name)
  acquired = connection.select_value(
    "SELECT pg_try_advisory_lock(#{LOCK_NAMESPACE}, #{lock_id})"
  )

  if acquired
    @held_locks[queue_name] = lock_id
    true
  else
    false
  end
rescue StandardError => e
  Pgbus.logger.warn { "[Pgbus] Advisory lock failed for #{queue_name}: #{e.message}" }
  false
end

#unlock(queue_name) ⇒ Object

Release the advisory lock for a queue. Called during shutdown.



47
48
49
50
51
52
53
54
55
56
# File 'lib/pgbus/process/queue_lock.rb', line 47

def unlock(queue_name)
  lock_id = @held_locks.delete(queue_name)
  return unless lock_id

  connection.select_value(
    "SELECT pg_advisory_unlock(#{LOCK_NAMESPACE}, #{lock_id})"
  )
rescue StandardError => e
  Pgbus.logger.warn { "[Pgbus] Advisory unlock failed for #{queue_name}: #{e.message}" }
end

#unlock_allObject

Release all held locks.



59
60
61
# File 'lib/pgbus/process/queue_lock.rb', line 59

def unlock_all
  @held_locks.each_key { |q| unlock(q) }
end