Class: GoodJob::AdvisoryLockable::AdvisoryLockCounter

Inherits:
Object
  • Object
show all
Defined in:
app/models/concerns/good_job/advisory_lockable.rb

Overview

Tracks advisory lock counts per connection per key for two purposes:

  1. CTE phantom cleanup: The CTE-based locking query can sometimes have pg_try_advisory_lock evaluated more than once for the same row by the query planner, creating phantom re-entrant session locks. When a key has been CTE-locked, the final unlock uses advisory_unlock_key! to fully release any phantoms.

  2. Xact lock awareness: Transaction-scoped locks (pg_advisory_xact_lock) are tracked because pg_locks cannot distinguish them from session locks. When an xact lock is active on the same key, advisory_unlock_key! cannot be used (it would loop forever), so cleanup falls back to looping pg_advisory_unlock until it returns false. Xact counts are pruned when no transaction is open on the connection (since xact locks release at the outermost COMMIT, not at savepoint release).

Thread safety: a leased connection is only ever accessed by the thread that leased it, so per-connection bookkeeping does not require additional synchronization.

Per-key structure: [session_count, xact_txns, cte]

xact_txns: Array of WeakRefs to AR transaction objects

Instance Method Summary collapse

Constructor Details

#initializeAdvisoryLockCounter

Returns a new instance of AdvisoryLockCounter.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 67

def initialize
  if defined?(ObjectSpace::WeakKeyMap)
    # WeakKeyMap (Ruby 3.3+) holds weak keys with strong values:
    # entries are automatically removed when the connection is GC'd.
    @map = ObjectSpace::WeakKeyMap.new
    @use_object_id = false
  else
    # On older Rubies, key by object_id with a WeakRef to the
    # connection stored alongside the data. Stale entries (where
    # the WeakRef is dead) are pruned on every access.
    @map = {}
    @use_object_id = true
  end
end

Instance Method Details

#[](conn) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 82

def [](conn)
  if @use_object_id
    entry = @map[oid(conn)]
    return unless entry

    ref, counts = entry
    return counts if ref.weakref_alive? && ref.__getobj__.equal?(conn)

    @map.delete(oid(conn))
    nil
  else
    @map[conn]
  end
end

#[]=(conn, value) ⇒ Object



97
98
99
100
101
102
103
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 97

def []=(conn, value)
  if @use_object_id
    @map[oid(conn)] = [WeakRef.new(conn), value]
  else
    @map[conn] = value
  end
end

#clear_session_locks(conn) ⇒ Object

Clear session-level lock bookkeeping for a connection, preserving any active transaction-scoped lock entries.



116
117
118
119
120
121
122
123
124
125
126
127
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 116

def clear_session_locks(conn)
  counts = self[conn]
  return unless counts

  counts.each do |key, (_session_count, xact_txns, _cte)|
    if xact_txns.empty?
      counts.delete(key)
    else
      counts[key] = [0, xact_txns, false]
    end
  end
end

#counts_for(conn, key) ⇒ Object

Returns [session_count, active_xact_count, cte] for a specific key.



178
179
180
181
182
183
184
185
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 178

def counts_for(conn, key)
  prune(conn)
  entry = self[conn]&.dig(key)
  return [0, 0, false] unless entry

  session_count, xact_txns, cte = entry
  [session_count, xact_txns.size, cte]
end

#delete(conn) ⇒ Object



105
106
107
108
109
110
111
112
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 105

def delete(conn)
  if @use_object_id
    @map.delete(oid(conn))
    prune_stale_entries
  else
    @map.delete(conn)
  end
end

#prune(conn) ⇒ Object

Remove xact entries whose transactions are no longer open (committed, rolled back, or GC’d).



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 189

def prune(conn)
  counts = self[conn]
  return unless counts

  counts.each do |key, (session_count, xact_txns, cte)|
    live_txns = xact_txns.select do |ref|
      ref.weakref_alive? && ref.open?
    rescue WeakRef::RefError
      false
    end

    if session_count.zero? && live_txns.empty?
      counts.delete(key)
    elsif live_txns.size != xact_txns.size
      counts[key] = [session_count, live_txns, cte]
    end
  end
end

#record_lock(conn, key, cte: false, xact: false) ⇒ Object

Record a lock acquisition. Per-key structure: [session_count, xact_txns, cte]

xact_txns is an Array of WeakRefs to transaction objects


148
149
150
151
152
153
154
155
156
157
158
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 148

def record_lock(conn, key, cte: false, xact: false)
  prune(conn)
  counts = self[conn] || (self[conn] = {})
  session_count, xact_txns, was_cte = counts[key] || [0, [], false]
  if xact
    xact_txns += [WeakRef.new(conn.current_transaction)]
  else
    session_count += 1
  end
  counts[key] = [session_count, xact_txns, was_cte || cte]
end

#record_unlock(conn, key) ⇒ Object

Record a lock release (session-level only; xact locks cannot be manually unlocked). Returns [new_session_count, active_xact_count, cte].



162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 162

def record_unlock(conn, key)
  prune(conn)
  counts = self[conn]
  return [0, 0, false] unless counts

  session_count, xact_txns, cte = counts[key] || [0, [], false]
  new_session_count = [session_count - 1, 0].max
  if new_session_count.zero? && xact_txns.empty?
    counts.delete(key)
  else
    counts[key] = [new_session_count, xact_txns, cte]
  end
  [new_session_count, xact_txns.size, cte]
end