Module: Pgbus::Uniqueness
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/pgbus/uniqueness.rb
Overview
Job uniqueness guarantees: prevent duplicate jobs from running concurrently.
Unlike concurrency limits (which allow N concurrent jobs for the same key), uniqueness ensures AT MOST ONE job with a given key exists in the system at any time — from enqueue through completion.
Lock lifecycle (advisory lock + thin lookup table):
1. Enqueue: pg_advisory_xact_lock serializes concurrent attempts,
then INSERT INTO pgbus_uniqueness_keys ON CONFLICT DO NOTHING.
The lock row lives as long as the job is in the queue or executing.
2. Execution: PGMQ's visibility timeout is the execution lock —
no separate claim_for_execution step needed.
3. Completion/DLQ: DELETE FROM pgbus_uniqueness_keys WHERE lock_key = ?.
4. Crash recovery: if a worker dies, VT expires, the message becomes
readable again. The uniqueness key row stays (correctly — the job
hasn't finished). The next worker picks it up and executes.
Strategies:
:until_executed — Lock acquired at enqueue, held through execution, released on
completion or DLQ. Prevents duplicate enqueue AND duplicate execution.
:while_executing — Lock acquired at execution start, released on completion.
Allows duplicate enqueue (multiple copies in queue) but only one
executes at a time.
Usage:
class ImportOrderJob < ApplicationJob
ensures_uniqueness strategy: :until_executed,
key: ->(order_id) { "import-order-#{order_id}" },
on_conflict: :reject
def perform(order_id)
# Only one instance of this job per order_id can exist at a time
end
end
Constant Summary collapse
- METADATA_KEY =
"pgbus_uniqueness_key"- STRATEGY_KEY =
"pgbus_uniqueness_strategy"- TTL_KEY =
"pgbus_uniqueness_lock_ttl"- DEFAULT_LOCK_TTL =
TTL is kept for metadata compatibility but no longer drives lock expiry. The lock exists until the job completes or is dead-lettered.
24 * 60 * 60
- VALID_STRATEGIES =
%i[until_executed while_executing].freeze
- VALID_CONFLICTS =
%i[reject discard log].freeze
Class Method Summary collapse
-
.acquire_enqueue_lock(key, active_job, queue_name: nil, msg_id: nil) ⇒ Object
Acquire the uniqueness lock at enqueue time (:until_executed only).
-
.acquire_execution_lock(key, payload) ⇒ Object
Acquire the uniqueness lock at execution time (:while_executing only).
- .extract_key(payload) ⇒ Object
- .extract_strategy(payload) ⇒ Object
- .inject_metadata(active_job, payload_hash) ⇒ Object
-
.release_lock(key) ⇒ Object
Release the uniqueness lock after execution completes.
- .resolve_key(active_job) ⇒ Object
- .uniqueness_config(active_job) ⇒ Object
Class Method Details
.acquire_enqueue_lock(key, active_job, queue_name: nil, msg_id: nil) ⇒ Object
Acquire the uniqueness lock at enqueue time (:until_executed only). Uses pg_advisory_xact_lock to serialize concurrent attempts. Returns :acquired, :locked, or :no_lock.
127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/pgbus/uniqueness.rb', line 127 def acquire_enqueue_lock(key, active_job, queue_name: nil, msg_id: nil) config = uniqueness_config(active_job) return :no_lock unless config return :no_lock unless config[:strategy] == :until_executed acquired = if msg_id && queue_name UniquenessKey.acquire!(key, queue_name: queue_name, msg_id: msg_id) else # Pre-produce check: use advisory lock + ON CONFLICT UniquenessKey.acquire!(key, queue_name: queue_name || "pending", msg_id: msg_id || 0) end acquired ? :acquired : :locked end |
.acquire_execution_lock(key, payload) ⇒ Object
Acquire the uniqueness lock at execution time (:while_executing only). Returns true if acquired, false if another instance is running.
143 144 145 146 147 148 149 |
# File 'lib/pgbus/uniqueness.rb', line 143 def acquire_execution_lock(key, payload) strategy = extract_strategy(payload) return true unless strategy == :while_executing queue_name = payload["queue_name"] || "unknown" UniquenessKey.acquire!(key, queue_name: queue_name, msg_id: 0) end |
.extract_key(payload) ⇒ Object
110 111 112 |
# File 'lib/pgbus/uniqueness.rb', line 110 def extract_key(payload) payload&.dig(METADATA_KEY) end |
.extract_strategy(payload) ⇒ Object
114 115 116 |
# File 'lib/pgbus/uniqueness.rb', line 114 def extract_strategy(payload) payload&.dig(STRATEGY_KEY)&.to_sym end |
.inject_metadata(active_job, payload_hash) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/pgbus/uniqueness.rb', line 96 def (active_job, payload_hash) config = uniqueness_config(active_job) return payload_hash unless config key = resolve_key(active_job) return payload_hash unless key payload_hash.merge( METADATA_KEY => key, STRATEGY_KEY => config[:strategy].to_s, TTL_KEY => config[:lock_ttl] ) end |
.release_lock(key) ⇒ Object
Release the uniqueness lock after execution completes.
152 153 154 155 156 |
# File 'lib/pgbus/uniqueness.rb', line 152 def release_lock(key) return unless key UniquenessKey.release!(key) end |
.resolve_key(active_job) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/pgbus/uniqueness.rb', line 78 def resolve_key(active_job) config = uniqueness_config(active_job) return nil unless config args = active_job.arguments last = args.last key = if last.is_a?(Hash) && last.each_key.all?(Symbol) config[:key].call(*args[...-1], **last) else config[:key].call(*args) end # Automatically serialize GlobalID-compatible objects (e.g. ActiveRecord models) # so users can pass model instances directly without manual .to_global_id.to_s key = key.to_global_id.to_s if key.respond_to?(:to_global_id) key end |
.uniqueness_config(active_job) ⇒ Object
118 119 120 121 122 |
# File 'lib/pgbus/uniqueness.rb', line 118 def uniqueness_config(active_job) return nil unless active_job.class.respond_to?(:pgbus_uniqueness) active_job.class.pgbus_uniqueness end |