Class: GoodJob::AdvisoryLockable::AdvisoryLockCounter
- Inherits:
-
Object
- Object
- GoodJob::AdvisoryLockable::AdvisoryLockCounter
- Defined in:
- app/models/concerns/good_job/advisory_lockable.rb
Overview
Tracks advisory lock counts per connection per key for two purposes:
-
CTE phantom cleanup: The CTE-based locking query can sometimes have pg_try_advisory_lock evaluated more than once for the same row by the query planner, creating phantom re-entrant session locks. When a key has been CTE-locked, the final unlock uses advisory_unlock_key! to fully release any phantoms.
-
Xact lock awareness: Transaction-scoped locks (
pg_advisory_xact_lock) are tracked because pg_locks cannot distinguish them from session locks. When an xact lock is active on the same key, advisory_unlock_key! cannot be used (it would loop forever), so cleanup falls back to looping pg_advisory_unlock until it returns false. Xact counts are pruned when no transaction is open on the connection (since xact locks release at the outermost COMMIT, not at savepoint release).
Thread safety: a leased connection is only ever accessed by the thread that leased it, so per-connection bookkeeping does not require additional synchronization.
Per-key structure: [session_count, xact_txns, cte]
xact_txns: Array of WeakRefs to AR transaction objects
Instance Method Summary collapse
- #[](conn) ⇒ Object
- #[]=(conn, value) ⇒ Object
-
#clear_session_locks(conn) ⇒ Object
Clear session-level lock bookkeeping for a connection, preserving any active transaction-scoped lock entries.
-
#counts_for(conn, key) ⇒ Object
Returns [session_count, active_xact_count, cte] for a specific key.
- #delete(conn) ⇒ Object
-
#initialize ⇒ AdvisoryLockCounter
constructor
A new instance of AdvisoryLockCounter.
-
#prune(conn) ⇒ Object
Remove xact entries whose transactions are no longer open (committed, rolled back, or GC’d).
-
#record_lock(conn, key, cte: false, xact: false) ⇒ Object
Record a lock acquisition.
-
#record_unlock(conn, key) ⇒ Object
Record a lock release (session-level only; xact locks cannot be manually unlocked).
Constructor Details
#initialize ⇒ AdvisoryLockCounter
Returns a new instance of AdvisoryLockCounter.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 50 def initialize if defined?(ObjectSpace::WeakKeyMap) # WeakKeyMap (Ruby 3.3+) holds weak keys with strong values: # entries are automatically removed when the connection is GC'd. @map = ObjectSpace::WeakKeyMap.new @use_object_id = false else # On older Rubies, key by object_id with a WeakRef to the # connection stored alongside the data. Stale entries (where # the WeakRef is dead) are pruned on every access. @map = {} @use_object_id = true end end |
Instance Method Details
#[](conn) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 65 def [](conn) if @use_object_id entry = @map[oid(conn)] return unless entry ref, counts = entry return counts if ref.weakref_alive? && ref.__getobj__.equal?(conn) @map.delete(oid(conn)) nil else @map[conn] end end |
#[]=(conn, value) ⇒ Object
80 81 82 83 84 85 86 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 80 def []=(conn, value) if @use_object_id @map[oid(conn)] = [WeakRef.new(conn), value] else @map[conn] = value end end |
#clear_session_locks(conn) ⇒ Object
Clear session-level lock bookkeeping for a connection, preserving any active transaction-scoped lock entries.
99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 99 def clear_session_locks(conn) counts = self[conn] return unless counts counts.each do |key, (_session_count, xact_txns, _cte)| if xact_txns.empty? counts.delete(key) else counts[key] = [0, xact_txns, false] end end end |
#counts_for(conn, key) ⇒ Object
Returns [session_count, active_xact_count, cte] for a specific key.
161 162 163 164 165 166 167 168 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 161 def counts_for(conn, key) prune(conn) entry = self[conn]&.dig(key) return [0, 0, false] unless entry session_count, xact_txns, cte = entry [session_count, xact_txns.size, cte] end |
#delete(conn) ⇒ Object
88 89 90 91 92 93 94 95 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 88 def delete(conn) if @use_object_id @map.delete(oid(conn)) prune_stale_entries else @map.delete(conn) end end |
#prune(conn) ⇒ Object
Remove xact entries whose transactions are no longer open (committed, rolled back, or GC’d).
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 172 def prune(conn) counts = self[conn] return unless counts counts.each do |key, (session_count, xact_txns, cte)| live_txns = xact_txns.select do |ref| ref.weakref_alive? && ref.open? rescue WeakRef::RefError false end if session_count.zero? && live_txns.empty? counts.delete(key) elsif live_txns.size != xact_txns.size counts[key] = [session_count, live_txns, cte] end end end |
#record_lock(conn, key, cte: false, xact: false) ⇒ Object
Record a lock acquisition. Per-key structure: [session_count, xact_txns, cte]
xact_txns is an Array of WeakRefs to transaction objects
131 132 133 134 135 136 137 138 139 140 141 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 131 def record_lock(conn, key, cte: false, xact: false) prune(conn) counts = self[conn] || (self[conn] = {}) session_count, xact_txns, was_cte = counts[key] || [0, [], false] if xact xact_txns += [WeakRef.new(conn.current_transaction)] else session_count += 1 end counts[key] = [session_count, xact_txns, was_cte || cte] end |
#record_unlock(conn, key) ⇒ Object
Record a lock release (session-level only; xact locks cannot be manually unlocked). Returns [new_session_count, active_xact_count, cte].
145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 145 def record_unlock(conn, key) prune(conn) counts = self[conn] return [0, 0, false] unless counts session_count, xact_txns, cte = counts[key] || [0, [], false] new_session_count = [session_count - 1, 0].max if new_session_count.zero? && xact_txns.empty? counts.delete(key) else counts[key] = [new_session_count, xact_txns, cte] end [new_session_count, xact_txns.size, cte] end |