Module: Wurk::Unique

Defined in:
lib/wurk/unique.rb

Overview

Sidekiq Enterprise unique jobs. Best-effort dedup at enqueue time keyed by a SHA256 digest of ‘[class, queue, args]` (overridable via `sidekiq_unique_context`). Three lock-release strategies:

* `unique_until: :success` (default) — lock retained through retries;
  server middleware DELs it on successful perform. Surviving across
  a process crash is bounded by `unique_for` TTL.
* `unique_until: :start` — server middleware DELs the lock right
  *before* invoking perform; a duplicate can be enqueued while the
  first is running.

Wire-compat (§3.9): single-key Redis layout — ‘unique:<sha256>` STRING holding the owning JID. Scheduled jobs extend the TTL by the delay so the lock covers the entire wait+execution window (§3.4).

Spec: docs/target/sidekiq-ent.md §3.

Defined Under Namespace

Classes: ClientMiddleware, ServerMiddleware

Constant Summary collapse

KEY_PREFIX =
'unique:'
DEFAULT_UNTIL =
:success
VALID_UNTIL =
%i[success start].freeze

Class Method Summary collapse

Class Method Details

.coerce_ttl(value) ⇒ Object

Coerce ‘unique_for` to a numeric seconds value. Accepts Integer, Numeric, ActiveSupport::Duration (any `to_i`-respondent), or `false` (skip). Returns nil when uniqueness should be skipped.



100
101
102
103
104
105
106
107
# File 'lib/wurk/unique.rb', line 100

def self.coerce_ttl(value)
  return nil if value.nil? || value == false
  return value if value.is_a?(Integer) && value.positive?
  return value.to_i if value.is_a?(Numeric)
  return value.to_i if duration_like?(value)

  nil
end

.disable!Object

Test helper — not part of the public Sidekiq surface. Clears the flag so per-test enable!/disable! does not leak across runs.



46
47
48
49
# File 'lib/wurk/unique.rb', line 46

def disable!
  @enabled = false
  nil
end

.duration_like?(value) ⇒ Boolean

Returns:

  • (Boolean)


109
110
111
112
113
# File 'lib/wurk/unique.rb', line 109

def self.duration_like?(value)
  return false unless value.respond_to?(:to_i)

  value.respond_to?(:since) || value.class.name.to_s.include?('Duration')
end

.enable!Object

rubocop:disable Naming/PredicateMethod



38
39
40
41
42
# File 'lib/wurk/unique.rb', line 38

def enable! # rubocop:disable Naming/PredicateMethod
  @enabled = true
  register_middleware!
  true
end

.enabled?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/wurk/unique.rb', line 34

def enabled?
  @enabled == true
end

.lock_key(klass, queue, args) ⇒ Object

Compute the lock key for an arbitrary ‘(queue, klass, args)` triple. Used by both the client middleware and the public `locked?` probe so they cannot drift.



54
55
56
57
# File 'lib/wurk/unique.rb', line 54

def lock_key(klass, queue, args)
  context = [klass.to_s, queue.to_s, args]
  "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}"
end

.lock_key_for(job) ⇒ Object

Compute the lock key from a job payload, honoring ‘sidekiq_unique_context` when the worker class is loaded and defines it.



62
63
64
65
# File 'lib/wurk/unique.rb', line 62

def lock_key_for(job)
  context = unique_context(job)
  "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}"
end

.locked?(queue_or_klass, klass_or_args = nil, args = nil) ⇒ String?

Returns owning jid, or nil when the lock is free.

Returns:

  • (String, nil)

    owning jid, or nil when the lock is free.



120
121
122
123
124
# File 'lib/wurk/unique.rb', line 120

def self.locked?(queue_or_klass, klass_or_args = nil, args = nil)
  queue, klass, payload = normalize_locked_args(queue_or_klass, klass_or_args, args)
  key = lock_key(klass, queue, payload)
  Wurk.redis { |c| c.call('GET', key) }
end

.unique_context(job) ⇒ Object

Default: ‘[class, queue, args]`. Workers may override by defining `self.sidekiq_unique_context(job)` returning any JSON-serializable value (e.g. a subset of args). Spec §3.5.



70
71
72
73
74
75
76
77
# File 'lib/wurk/unique.rb', line 70

def unique_context(job)
  klass = resolve_class(job['class'])
  if klass.respond_to?(:sidekiq_unique_context)
    klass.sidekiq_unique_context(job)
  else
    [job['class'], job['queue'], job['args']]
  end
end