Module: Wurk::Unique
- Defined in:
- lib/wurk/unique.rb
Overview
Sidekiq Enterprise unique jobs. Best-effort dedup at enqueue time keyed by a SHA256 digest of ‘[class, queue, args]` (overridable via `sidekiq_unique_context`). Three lock-release strategies:
* `unique_until: :success` (default) — lock retained through retries;
server middleware DELs it on successful perform. Surviving across
a process crash is bounded by `unique_for` TTL.
* `unique_until: :start` — server middleware DELs the lock right
*before* invoking perform; a duplicate can be enqueued while the
first is running.
A job that dies automatically (retries exhausted / discarded) releases its lock via a death handler; manual UI kills keep the lock until TTL expiry (Ent wiki, Ent-Unique-Jobs).
Wire-compat (§3.9): single-key Redis layout — ‘unique:<sha256>` STRING holding the owning JID. Scheduled jobs extend the TTL by the delay so the lock covers the entire wait+execution window (§3.4).
Spec: docs/target/sidekiq-ent.md §3.
Defined Under Namespace
Classes: ClientMiddleware, ServerMiddleware
Constant Summary collapse
- KEY_PREFIX =
'unique:'- DEFAULT_UNTIL =
:success- VALID_UNTIL =
%i[success start].freeze
- DEATH_HANDLER =
Ent parity: a job that dies automatically releases its lock so a duplicate can enqueue immediately. Manual API/UI kills keep the lock until TTL expiry (Ent wiki, Ent-Unique-Jobs) — they reach death handlers too (Sidekiq fires them on API kills), so we recognize the synthesized kill exception and skip the release for it. Atomic CAS-DEL via the shared Lua script mirrors ServerMiddleware#release.
lambda do |job, exception| next unless Wurk::Unique.enabled? next unless Wurk::Unique.coerce_ttl(job['unique_for']) next if exception.instance_of?(::RuntimeError) && exception. == DeadSet::API_KILL_MESSAGE Wurk.redis { |conn| Wurk::Unique.release_if_owner(conn, Wurk::Unique.lock_key_for(job), job['jid']) } end
Class Method Summary collapse
-
.coerce_ttl(value) ⇒ Object
Coerce ‘unique_for` to a numeric seconds value.
-
.disable! ⇒ Object
Test helper — not part of the public Sidekiq surface.
- .duration_like?(value) ⇒ Boolean
-
.enable! ⇒ Object
rubocop:disable Naming/PredicateMethod.
- .enabled? ⇒ Boolean
-
.lock_key(klass, queue, args) ⇒ Object
Compute the lock key for an arbitrary ‘(queue, klass, args)` triple.
-
.lock_key_for(job) ⇒ Object
Compute the lock key from a job payload, honoring ‘sidekiq_unique_context` when the worker class is loaded and defines it.
-
.locked?(queue_or_klass, klass_or_args = nil, args = nil) ⇒ String?
Owning jid, or nil when the lock is free.
-
.release_if_owner(conn, key, jid) ⇒ Object
Atomic compare-and-delete of a unique lock key.
-
.unique_context(job) ⇒ Object
Default: ‘[class, queue, args]`.
Class Method Details
.coerce_ttl(value) ⇒ Object
Coerce ‘unique_for` to a numeric seconds value. Accepts Integer, Numeric, ActiveSupport::Duration (any `to_i`-respondent), or `false` (skip). Returns nil when uniqueness should be skipped.
132 133 134 135 136 137 138 139 |
# File 'lib/wurk/unique.rb', line 132 def self.coerce_ttl(value) return nil if value.nil? || value == false return value if value.is_a?(Integer) && value.positive? return value.to_i if value.is_a?(Numeric) return value.to_i if duration_like?(value) nil end |
.disable! ⇒ Object
Test helper — not part of the public Sidekiq surface. Clears the flag so per-test enable!/disable! does not leak across runs.
76 77 78 79 |
# File 'lib/wurk/unique.rb', line 76 def disable! @enabled = false nil end |
.duration_like?(value) ⇒ Boolean
141 142 143 144 145 |
# File 'lib/wurk/unique.rb', line 141 def self.duration_like?(value) return false unless value.respond_to?(:to_i) value.respond_to?(:since) || value.class.name.to_s.include?('Duration') end |
.enable! ⇒ Object
rubocop:disable Naming/PredicateMethod
68 69 70 71 72 |
# File 'lib/wurk/unique.rb', line 68 def enable! # rubocop:disable Naming/PredicateMethod @enabled = true register_middleware! true end |
.enabled? ⇒ Boolean
64 65 66 |
# File 'lib/wurk/unique.rb', line 64 def enabled? @enabled == true end |
.lock_key(klass, queue, args) ⇒ Object
Compute the lock key for an arbitrary ‘(queue, klass, args)` triple. Used by both the client middleware and the public `locked?` probe so they cannot drift.
84 85 86 87 |
# File 'lib/wurk/unique.rb', line 84 def lock_key(klass, queue, args) context = [klass.to_s, queue.to_s, args] "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}" end |
.lock_key_for(job) ⇒ Object
Compute the lock key from a job payload, honoring ‘sidekiq_unique_context` when the worker class is loaded and defines it.
92 93 94 95 |
# File 'lib/wurk/unique.rb', line 92 def lock_key_for(job) context = unique_context(job) "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}" end |
.locked?(queue_or_klass, klass_or_args = nil, args = nil) ⇒ String?
Returns owning jid, or nil when the lock is free.
152 153 154 155 156 |
# File 'lib/wurk/unique.rb', line 152 def self.locked?(queue_or_klass, klass_or_args = nil, args = nil) queue, klass, payload = normalize_locked_args(queue_or_klass, klass_or_args, args) key = lock_key(klass, queue, payload) Wurk.redis { |c| c.call('GET', key) } end |
.release_if_owner(conn, key, jid) ⇒ Object
Atomic compare-and-delete of a unique lock key. Two-command GET-then-DEL is not a real CAS — the key can expire between the GET and DEL, letting a fresh enqueue grab it, and the bare DEL would then drop the new owner’s lock. Routed through a single Lua script (‘Wurk::Lua::RELEASE_IF_OWNER`) shared by `ServerMiddleware#release` (normal success/start release) and `DEATH_HANDLER` (automatic-death release) so the two paths cannot drift.
55 56 57 |
# File 'lib/wurk/unique.rb', line 55 def self.release_if_owner(conn, key, jid) Wurk::Lua::Loader.eval_cached(conn, :release_if_owner, keys: [key], argv: [jid]) end |
.unique_context(job) ⇒ Object
Default: ‘[class, queue, args]`. Workers may override by defining `self.sidekiq_unique_context(job)` returning any JSON-serializable value (e.g. a subset of args). Spec §3.5.
100 101 102 103 104 105 106 107 |
# File 'lib/wurk/unique.rb', line 100 def unique_context(job) klass = resolve_class(job['class']) if klass.respond_to?(:sidekiq_unique_context) klass.sidekiq_unique_context(job) else [job['class'], job['queue'], job['args']] end end |