Module: Pgbus::Uniqueness

Extended by:
ActiveSupport::Concern
Defined in:
lib/pgbus/uniqueness.rb

Overview

Job uniqueness guarantees: prevent duplicate jobs from running concurrently.

Unlike concurrency limits (which allow N concurrent jobs for the same key), uniqueness ensures AT MOST ONE job with a given key exists in the system at any time — from enqueue through completion.

Lock lifecycle (advisory lock + thin lookup table):

1. Enqueue: pg_advisory_xact_lock serializes concurrent attempts,
   then INSERT INTO pgbus_uniqueness_keys ON CONFLICT DO NOTHING.
   The lock row lives as long as the job is in the queue or executing.
2. Execution: PGMQ's visibility timeout is the execution lock —
   no separate claim_for_execution step needed.
3. Completion/DLQ: DELETE FROM pgbus_uniqueness_keys WHERE lock_key = ?.
4. Crash recovery: if a worker dies, VT expires, the message becomes
   readable again. The uniqueness key row stays (correctly — the job
   hasn't finished). The next worker picks it up and executes.

Strategies:

:until_executed  — Lock acquired at enqueue, held through execution, released on
                   completion or DLQ. Prevents duplicate enqueue AND duplicate execution.

:while_executing — Lock acquired at execution start, released on completion.
                   Allows duplicate enqueue (multiple copies in queue) but only one
                   executes at a time.

Usage:

class ImportOrderJob < ApplicationJob
  ensures_uniqueness strategy: :until_executed,
                     key: ->(order_id) { "import-order-#{order_id}" },
                     on_conflict: :reject

  def perform(order_id)
    # Only one instance of this job per order_id can exist at a time
  end
end

Constant Summary collapse

METADATA_KEY =
"pgbus_uniqueness_key"
STRATEGY_KEY =
"pgbus_uniqueness_strategy"
TTL_KEY =
"pgbus_uniqueness_lock_ttl"
DEFAULT_LOCK_TTL =

TTL is kept for metadata compatibility but no longer drives lock expiry. The lock exists until the job completes or is dead-lettered.

24 * 60 * 60
VALID_STRATEGIES =
%i[until_executed while_executing].freeze
VALID_CONFLICTS =
%i[reject discard log].freeze

Class Method Summary collapse

Class Method Details

.acquire_enqueue_lock(key, active_job, queue_name: nil, msg_id: nil) ⇒ Object

Acquire the uniqueness lock at enqueue time (:until_executed only). Uses pg_advisory_xact_lock to serialize concurrent attempts. Returns :acquired, :locked, or :no_lock.



127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/pgbus/uniqueness.rb', line 127

def acquire_enqueue_lock(key, active_job, queue_name: nil, msg_id: nil)
  config = uniqueness_config(active_job)
  return :no_lock unless config
  return :no_lock unless config[:strategy] == :until_executed

  acquired = if msg_id && queue_name
               UniquenessKey.acquire!(key, queue_name: queue_name, msg_id: msg_id)
             else
               # Pre-produce check: use advisory lock + ON CONFLICT
               UniquenessKey.acquire!(key, queue_name: queue_name || "pending", msg_id: msg_id || 0)
             end
  acquired ? :acquired : :locked
end

.acquire_execution_lock(key, payload) ⇒ Object

Acquire the uniqueness lock at execution time (:while_executing only). Returns true if acquired, false if another instance is running.



143
144
145
146
147
148
149
# File 'lib/pgbus/uniqueness.rb', line 143

def acquire_execution_lock(key, payload)
  strategy = extract_strategy(payload)
  return true unless strategy == :while_executing

  queue_name = payload["queue_name"] || "unknown"
  UniquenessKey.acquire!(key, queue_name: queue_name, msg_id: 0)
end

.extract_key(payload) ⇒ Object



110
111
112
# File 'lib/pgbus/uniqueness.rb', line 110

def extract_key(payload)
  payload&.dig(METADATA_KEY)
end

.extract_strategy(payload) ⇒ Object



114
115
116
# File 'lib/pgbus/uniqueness.rb', line 114

def extract_strategy(payload)
  payload&.dig(STRATEGY_KEY)&.to_sym
end

.inject_metadata(active_job, payload_hash) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/pgbus/uniqueness.rb', line 96

def (active_job, payload_hash)
  config = uniqueness_config(active_job)
  return payload_hash unless config

  key = resolve_key(active_job)
  return payload_hash unless key

  payload_hash.merge(
    METADATA_KEY => key,
    STRATEGY_KEY => config[:strategy].to_s,
    TTL_KEY => config[:lock_ttl]
  )
end

.release_lock(key) ⇒ Object

Release the uniqueness lock after execution completes.



152
153
154
155
156
# File 'lib/pgbus/uniqueness.rb', line 152

def release_lock(key)
  return unless key

  UniquenessKey.release!(key)
end

.resolve_key(active_job) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/pgbus/uniqueness.rb', line 78

def resolve_key(active_job)
  config = uniqueness_config(active_job)
  return nil unless config

  args = active_job.arguments
  last = args.last
  key = if last.is_a?(Hash) && last.each_key.all?(Symbol)
          config[:key].call(*args[...-1], **last)
        else
          config[:key].call(*args)
        end

  # Automatically serialize GlobalID-compatible objects (e.g. ActiveRecord models)
  # so users can pass model instances directly without manual .to_global_id.to_s
  key = key.to_global_id.to_s if key.respond_to?(:to_global_id)
  key
end

.uniqueness_config(active_job) ⇒ Object



118
119
120
121
122
# File 'lib/pgbus/uniqueness.rb', line 118

def uniqueness_config(active_job)
  return nil unless active_job.class.respond_to?(:pgbus_uniqueness)

  active_job.class.pgbus_uniqueness
end