Module: Undertow::Buffer
- Defined in:
- lib/undertow/buffer.rb
Overview
Low-level Redis SET operations used by Trackable callbacks and DrainJob. All methods are no-ops when tracking is disabled or Redis is unavailable.
Class Method Summary collapse
-
.acquire_drain_lock(ttl: 30) ⇒ Object
Acquire the distributed drain lock using SET NX.
- .deregister_model(model_name) ⇒ Object
- .pending? ⇒ Boolean
- .pending_model_names ⇒ Object
- .pop_deleted(model_name, count) ⇒ Object
- .pop_pending(model_name, count) ⇒ Object
- .push_deleted(model_name, ids) ⇒ Object
- .push_pending(model_name, ids) ⇒ Object
-
.release_drain_lock ⇒ Object
Release the drain lock.
- .remaining(model_name) ⇒ Object
- .reregister_model(model_name) ⇒ Object
- .restore_deleted(model_name, ids) ⇒ Object
- .restore_pending(model_name, ids) ⇒ Object
Class Method Details
.acquire_drain_lock(ttl: 30) ⇒ Object
Acquire the distributed drain lock using SET NX. Returns true if the lock was acquired, false if it was already held. TTL is a safety valve in case the job process dies before releasing it.
68 69 70 71 72 73 |
# File 'lib/undertow/buffer.rb', line 68 def acquire_drain_lock(ttl: 30) lock_key = Undertow.configuration.drain_lock_key return true unless lock_key with_redis { |r| r.set(lock_key, '1', nx: true, ex: ttl) } || false end |
.deregister_model(model_name) ⇒ Object
38 39 40 |
# File 'lib/undertow/buffer.rb', line 38 def deregister_model(model_name) with_redis { |r| r.srem(Registry::MODELS_KEY, model_name) } end |
.pending? ⇒ Boolean
61 62 63 |
# File 'lib/undertow/buffer.rb', line 61 def pending? with_redis { |r| r.scard(Registry::MODELS_KEY) > 0 } || false end |
.pending_model_names ⇒ Object
34 35 36 |
# File 'lib/undertow/buffer.rb', line 34 def pending_model_names with_redis { |r| r.smembers(Registry::MODELS_KEY) } || [] end |
.pop_deleted(model_name, count) ⇒ Object
30 31 32 |
# File 'lib/undertow/buffer.rb', line 30 def pop_deleted(model_name, count) with_redis { |r| r.spop(Registry.deleted_key(model_name), count) } || [] end |
.pop_pending(model_name, count) ⇒ Object
26 27 28 |
# File 'lib/undertow/buffer.rb', line 26 def pop_pending(model_name, count) with_redis { |r| r.spop(Registry.pending_key(model_name), count) } || [] end |
.push_deleted(model_name, ids) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/undertow/buffer.rb', line 17 def push_deleted(model_name, ids) return if Undertow.tracking_disabled? with_redis do |r| r.sadd(Registry.deleted_key(model_name), ids) r.sadd(Registry::MODELS_KEY, model_name) end end |
.push_pending(model_name, ids) ⇒ Object
8 9 10 11 12 13 14 15 |
# File 'lib/undertow/buffer.rb', line 8 def push_pending(model_name, ids) return if Undertow.tracking_disabled? with_redis do |r| r.sadd(Registry.pending_key(model_name), ids) r.sadd(Registry::MODELS_KEY, model_name) end end |
.release_drain_lock ⇒ Object
Release the drain lock. Called at the start of DrainJob#perform so the scheduler can enqueue another job for IDs that arrive while this one runs.
77 78 79 80 81 82 |
# File 'lib/undertow/buffer.rb', line 77 def release_drain_lock lock_key = Undertow.configuration.drain_lock_key return unless lock_key with_redis { |r| r.del(lock_key) } end |
.remaining(model_name) ⇒ Object
46 47 48 49 50 51 |
# File 'lib/undertow/buffer.rb', line 46 def remaining(model_name) with_redis do |r| r.scard(Registry.pending_key(model_name)) + r.scard(Registry.deleted_key(model_name)) end || 0 end |
.reregister_model(model_name) ⇒ Object
42 43 44 |
# File 'lib/undertow/buffer.rb', line 42 def reregister_model(model_name) with_redis { |r| r.sadd(Registry::MODELS_KEY, model_name) } end |
.restore_deleted(model_name, ids) ⇒ Object
57 58 59 |
# File 'lib/undertow/buffer.rb', line 57 def restore_deleted(model_name, ids) with_redis { |r| r.sadd(Registry.deleted_key(model_name), ids) } if ids.any? end |
.restore_pending(model_name, ids) ⇒ Object
53 54 55 |
# File 'lib/undertow/buffer.rb', line 53 def restore_pending(model_name, ids) with_redis { |r| r.sadd(Registry.pending_key(model_name), ids) } if ids.any? end |