Class: Pgbus::Process::QueueLock
- Inherits:
-
Object
- Object
- Pgbus::Process::QueueLock
- Defined in:
- lib/pgbus/process/queue_lock.rb
Overview
Manages PostgreSQL advisory locks for single-active-consumer mode. Only one worker process can hold the lock for a given queue at a time. Other workers skip the queue and process other queues instead.
Uses pg_try_advisory_lock (non-blocking) so workers never wait —they simply skip queues they can’t lock and try again next cycle.
Locks are session-level and automatically released when the connection closes (including on crash), so no manual cleanup is needed.
Constant Summary collapse
- LOCK_NAMESPACE =
Use a fixed namespace to avoid collision with application advisory locks. CRC32 of “pgbus_queue_lock” = 0x5067_6275
0x5067_6275
Instance Method Summary collapse
- #held_queues ⇒ Object
-
#initialize ⇒ QueueLock
constructor
A new instance of QueueLock.
- #locked?(queue_name) ⇒ Boolean
-
#try_lock(queue_name) ⇒ Object
Try to acquire an advisory lock for the given queue name.
-
#unlock(queue_name) ⇒ Object
Release the advisory lock for a queue.
-
#unlock_all ⇒ Object
Release all held locks.
Constructor Details
#initialize ⇒ QueueLock
Returns a new instance of QueueLock.
21 22 23 |
# File 'lib/pgbus/process/queue_lock.rb', line 21 def initialize @held_locks = Concurrent::Map.new end |
Instance Method Details
#held_queues ⇒ Object
67 68 69 |
# File 'lib/pgbus/process/queue_lock.rb', line 67 def held_queues @held_locks.keys end |
#locked?(queue_name) ⇒ Boolean
63 64 65 |
# File 'lib/pgbus/process/queue_lock.rb', line 63 def locked?(queue_name) @held_locks.key?(queue_name) end |
#try_lock(queue_name) ⇒ Object
Try to acquire an advisory lock for the given queue name. Returns true if acquired (or already held), false if another process holds it.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pgbus/process/queue_lock.rb', line 27 def try_lock(queue_name) return true if @held_locks[queue_name] lock_id = lock_id_for(queue_name) acquired = connection.select_value( "SELECT pg_try_advisory_lock(#{LOCK_NAMESPACE}, #{lock_id})" ) if acquired @held_locks[queue_name] = lock_id true else false end rescue StandardError => e Pgbus.logger.warn { "[Pgbus] Advisory lock failed for #{queue_name}: #{e.}" } false end |
#unlock(queue_name) ⇒ Object
Release the advisory lock for a queue. Called during shutdown.
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/pgbus/process/queue_lock.rb', line 47 def unlock(queue_name) lock_id = @held_locks.delete(queue_name) return unless lock_id connection.select_value( "SELECT pg_advisory_unlock(#{LOCK_NAMESPACE}, #{lock_id})" ) rescue StandardError => e Pgbus.logger.warn { "[Pgbus] Advisory unlock failed for #{queue_name}: #{e.}" } end |
#unlock_all ⇒ Object
Release all held locks.
59 60 61 |
# File 'lib/pgbus/process/queue_lock.rb', line 59 def unlock_all @held_locks.each_key { |q| unlock(q) } end |