Class: Pgbus::JobLock
- Defined in:
- app/models/pgbus/job_lock.rb
Constant Summary collapse
- STATES =
States:
queued — lock held from enqueue time (:until_executed), no worker yet executing — lock held by an active worker process %w[queued executing].freeze
Class Method Summary collapse
-
.acquire!(lock_key, job_class:, ttl:, job_id: nil, state: "queued", owner_pid: nil, owner_hostname: nil) ⇒ Object
Atomically try to acquire a lock.
-
.claim_for_execution!(lock_key, owner_pid:, owner_hostname:, ttl:) ⇒ Object
Transition a queued lock to executing state and claim ownership.
-
.cleanup_expired! ⇒ Object
Last-resort cleanup: delete locks whose expires_at has passed.
-
.locked?(lock_key) ⇒ Boolean
Check if a lock is currently held (regardless of expiry — reaper handles orphans).
-
.reap_orphaned! ⇒ Object
Reap orphaned locks whose owner is no longer alive, plus stale queued locks that were never claimed by a worker.
-
.release!(lock_key) ⇒ Object
Release a lock by key.
Class Method Details
.acquire!(lock_key, job_class:, ttl:, job_id: nil, state: "queued", owner_pid: nil, owner_hostname: nil) ⇒ Object
Atomically try to acquire a lock. Cleans up expired locks for this key first (crash recovery at acquire time). Returns true if acquired, false if already locked.
Uses raw SQL on the hot path to minimize ActiveRecord allocations (~29 objects vs ~304 per acquire+release cycle with AR query builder).
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'app/models/pgbus/job_lock.rb', line 22 def self.acquire!(lock_key, job_class:, ttl:, job_id: nil, state: "queued", owner_pid: nil, owner_hostname: nil) # rubocop:disable Naming/PredicateMethod expires_at = Time.current + ttl # Remove any expired lock for this key inline (last-resort TTL recovery) connection.exec_delete( "DELETE FROM #{table_name} WHERE lock_key = $1 AND expires_at < $2", "JobLock Expire", [lock_key, Time.current] ) result = connection.exec_query( "INSERT INTO #{table_name} (lock_key, job_class, job_id, state, owner_pid, owner_hostname, expires_at) " \ "VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (lock_key) DO NOTHING RETURNING id", "JobLock Acquire", [lock_key, job_class, job_id, state, owner_pid, owner_hostname, expires_at] ) result.rows.any? end |
.claim_for_execution!(lock_key, owner_pid:, owner_hostname:, ttl:) ⇒ Object
Transition a queued lock to executing state and claim ownership. Called when a worker starts executing a job that was locked at enqueue time.
41 42 43 44 45 46 47 |
# File 'app/models/pgbus/job_lock.rb', line 41 def self.claim_for_execution!(lock_key, owner_pid:, owner_hostname:, ttl:) connection.exec_update( "UPDATE #{table_name} SET state = $1, owner_pid = $2, owner_hostname = $3, expires_at = $4 " \ "WHERE lock_key = $5", "JobLock Claim", ["executing", owner_pid, owner_hostname, Time.current + ttl, lock_key] ) end |
.cleanup_expired! ⇒ Object
Last-resort cleanup: delete locks whose expires_at has passed. This only fires when the reaper itself can’t run (e.g., entire supervisor dead).
94 95 96 |
# File 'app/models/pgbus/job_lock.rb', line 94 def self.cleanup_expired! expired.delete_all end |
.locked?(lock_key) ⇒ Boolean
Check if a lock is currently held (regardless of expiry — reaper handles orphans).
58 59 60 61 62 63 |
# File 'app/models/pgbus/job_lock.rb', line 58 def self.locked?(lock_key) result = connection.select_value( "SELECT 1 FROM #{table_name} WHERE lock_key = $1 LIMIT 1", "JobLock Check", [lock_key] ) !result.nil? end |
.reap_orphaned! ⇒ Object
Reap orphaned locks whose owner is no longer alive, plus stale queued locks that were never claimed by a worker. Returns the total number of orphaned locks released.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'app/models/pgbus/job_lock.rb', line 68 def self.reap_orphaned! reaped = 0 # 1. Executing locks whose owner process has no healthy heartbeat alive_workers = ProcessEntry .where("last_heartbeat_at >= ?", Time.current - Process::Heartbeat::ALIVE_THRESHOLD) .pluck(:pid, :hostname) orphaned_executing = executing.select do |lock| alive_workers.none? { |pid, hostname| pid == lock.owner_pid && hostname == lock.owner_hostname } end reaped += where(id: orphaned_executing.map(&:id)).delete_all if orphaned_executing.any? # 2. Queued locks older than the visibility timeout that were never # claimed. These are left behind when enqueue fails after lock # acquisition (e.g. network error, process crash). threshold = Pgbus.configuration.visibility_timeout stale_queued = queued_locks.where("locked_at < ?", Time.current - threshold) reaped += stale_queued.delete_all if stale_queued.exists? reaped end |
.release!(lock_key) ⇒ Object
Release a lock by key.
50 51 52 53 54 55 |
# File 'app/models/pgbus/job_lock.rb', line 50 def self.release!(lock_key) connection.exec_delete( "DELETE FROM #{table_name} WHERE lock_key = $1", "JobLock Release", [lock_key] ) end |