Class: GoodJob::AdvisoryLockable::AdvisoryLockCounter

Inherits:
Object
  • Object
show all
Defined in:
app/models/concerns/good_job/advisory_lockable.rb

Overview

Tracks advisory lock counts per connection per key for two purposes:

  1. CTE phantom cleanup: The CTE-based locking query can sometimes have pg_try_advisory_lock evaluated more than once for the same row by the query planner, creating phantom re-entrant session locks. When a key has been CTE-locked, the final unlock uses advisory_unlock_key! to fully release any phantoms.

  2. Xact lock awareness: Transaction-scoped locks (pg_advisory_xact_lock) are tracked because pg_locks cannot distinguish them from session locks. When an xact lock is active on the same key, advisory_unlock_key! cannot be used (it would loop forever), so cleanup falls back to looping pg_advisory_unlock until it returns false. Xact counts are pruned when no transaction is open on the connection (since xact locks release at the outermost COMMIT, not at savepoint release).

Thread safety: a leased connection is only ever accessed by the thread that leased it, so per-connection bookkeeping does not require additional synchronization.

Per-key structure: [session_count, xact_txns, cte]

xact_txns: Array of WeakRefs to AR transaction objects

Instance Method Summary collapse

Constructor Details

#initializeAdvisoryLockCounter

Returns a new instance of AdvisoryLockCounter.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 50

def initialize
  if defined?(ObjectSpace::WeakKeyMap)
    # WeakKeyMap (Ruby 3.3+) holds weak keys with strong values:
    # entries are automatically removed when the connection is GC'd.
    @map = ObjectSpace::WeakKeyMap.new
    @use_object_id = false
  else
    # On older Rubies, key by object_id with a WeakRef to the
    # connection stored alongside the data. Stale entries (where
    # the WeakRef is dead) are pruned on every access.
    @map = {}
    @use_object_id = true
  end
end

Instance Method Details

#[](conn) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 65

def [](conn)
  if @use_object_id
    entry = @map[oid(conn)]
    return unless entry

    ref, counts = entry
    return counts if ref.weakref_alive? && ref.__getobj__.equal?(conn)

    @map.delete(oid(conn))
    nil
  else
    @map[conn]
  end
end

#[]=(conn, value) ⇒ Object



80
81
82
83
84
85
86
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 80

def []=(conn, value)
  if @use_object_id
    @map[oid(conn)] = [WeakRef.new(conn), value]
  else
    @map[conn] = value
  end
end

#clear_session_locks(conn) ⇒ Object

Clear session-level lock bookkeeping for a connection, preserving any active transaction-scoped lock entries.



99
100
101
102
103
104
105
106
107
108
109
110
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 99

def clear_session_locks(conn)
  counts = self[conn]
  return unless counts

  counts.each do |key, (_session_count, xact_txns, _cte)|
    if xact_txns.empty?
      counts.delete(key)
    else
      counts[key] = [0, xact_txns, false]
    end
  end
end

#counts_for(conn, key) ⇒ Object

Returns [session_count, active_xact_count, cte] for a specific key.



161
162
163
164
165
166
167
168
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 161

def counts_for(conn, key)
  prune(conn)
  entry = self[conn]&.dig(key)
  return [0, 0, false] unless entry

  session_count, xact_txns, cte = entry
  [session_count, xact_txns.size, cte]
end

#delete(conn) ⇒ Object



88
89
90
91
92
93
94
95
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 88

def delete(conn)
  if @use_object_id
    @map.delete(oid(conn))
    prune_stale_entries
  else
    @map.delete(conn)
  end
end

#prune(conn) ⇒ Object

Remove xact entries whose transactions are no longer open (committed, rolled back, or GC’d).



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 172

def prune(conn)
  counts = self[conn]
  return unless counts

  counts.each do |key, (session_count, xact_txns, cte)|
    live_txns = xact_txns.select do |ref|
      ref.weakref_alive? && ref.open?
    rescue WeakRef::RefError
      false
    end

    if session_count.zero? && live_txns.empty?
      counts.delete(key)
    elsif live_txns.size != xact_txns.size
      counts[key] = [session_count, live_txns, cte]
    end
  end
end

#record_lock(conn, key, cte: false, xact: false) ⇒ Object

Record a lock acquisition. Per-key structure: [session_count, xact_txns, cte]

xact_txns is an Array of WeakRefs to transaction objects


131
132
133
134
135
136
137
138
139
140
141
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 131

def record_lock(conn, key, cte: false, xact: false)
  prune(conn)
  counts = self[conn] || (self[conn] = {})
  session_count, xact_txns, was_cte = counts[key] || [0, [], false]
  if xact
    xact_txns += [WeakRef.new(conn.current_transaction)]
  else
    session_count += 1
  end
  counts[key] = [session_count, xact_txns, was_cte || cte]
end

#record_unlock(conn, key) ⇒ Object

Record a lock release (session-level only; xact locks cannot be manually unlocked). Returns [new_session_count, active_xact_count, cte].



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'app/models/concerns/good_job/advisory_lockable.rb', line 145

def record_unlock(conn, key)
  prune(conn)
  counts = self[conn]
  return [0, 0, false] unless counts

  session_count, xact_txns, cte = counts[key] || [0, [], false]
  new_session_count = [session_count - 1, 0].max
  if new_session_count.zero? && xact_txns.empty?
    counts.delete(key)
  else
    counts[key] = [new_session_count, xact_txns, cte]
  end
  [new_session_count, xact_txns.size, cte]
end