Module: Undertow::Buffer

Defined in:
lib/undertow/buffer.rb

Overview

Low-level Redis SET operations used by Trackable callbacks and DrainJob. All methods are no-ops when tracking is disabled or Redis is unavailable.

Class Method Summary collapse

Class Method Details

.acquire_drain_lock(ttl: 30) ⇒ Object

Acquire the distributed drain lock using SET NX. Returns true if the lock was acquired, false if it was already held. TTL is a safety valve in case the job process dies before releasing it.



68
69
70
71
72
73
# File 'lib/undertow/buffer.rb', line 68

def acquire_drain_lock(ttl: 30)
  lock_key = Undertow.configuration.drain_lock_key
  return true unless lock_key

  with_redis { |r| r.set(lock_key, '1', nx: true, ex: ttl) } || false
end

.deregister_model(model_name) ⇒ Object



38
39
40
# File 'lib/undertow/buffer.rb', line 38

def deregister_model(model_name)
  with_redis { |r| r.srem(Registry::MODELS_KEY, model_name) }
end

.pending?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/undertow/buffer.rb', line 61

def pending?
  with_redis { |r| r.scard(Registry::MODELS_KEY) > 0 } || false
end

.pending_model_namesObject



34
35
36
# File 'lib/undertow/buffer.rb', line 34

def pending_model_names
  with_redis { |r| r.smembers(Registry::MODELS_KEY) } || []
end

.pop_deleted(model_name, count) ⇒ Object



30
31
32
# File 'lib/undertow/buffer.rb', line 30

def pop_deleted(model_name, count)
  with_redis { |r| r.spop(Registry.deleted_key(model_name), count) } || []
end

.pop_pending(model_name, count) ⇒ Object



26
27
28
# File 'lib/undertow/buffer.rb', line 26

def pop_pending(model_name, count)
  with_redis { |r| r.spop(Registry.pending_key(model_name), count) } || []
end

.push_deleted(model_name, ids) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/undertow/buffer.rb', line 17

def push_deleted(model_name, ids)
  return if Undertow.tracking_disabled?

  with_redis do |r|
    r.sadd(Registry.deleted_key(model_name), ids)
    r.sadd(Registry::MODELS_KEY, model_name)
  end
end

.push_pending(model_name, ids) ⇒ Object



8
9
10
11
12
13
14
15
# File 'lib/undertow/buffer.rb', line 8

def push_pending(model_name, ids)
  return if Undertow.tracking_disabled?

  with_redis do |r|
    r.sadd(Registry.pending_key(model_name), ids)
    r.sadd(Registry::MODELS_KEY, model_name)
  end
end

.release_drain_lockObject

Release the drain lock. Called at the start of DrainJob#perform so the scheduler can enqueue another job for IDs that arrive while this one runs.



77
78
79
80
81
82
# File 'lib/undertow/buffer.rb', line 77

def release_drain_lock
  lock_key = Undertow.configuration.drain_lock_key
  return unless lock_key

  with_redis { |r| r.del(lock_key) }
end

.remaining(model_name) ⇒ Object



46
47
48
49
50
51
# File 'lib/undertow/buffer.rb', line 46

def remaining(model_name)
  with_redis do |r|
    r.scard(Registry.pending_key(model_name)) +
    r.scard(Registry.deleted_key(model_name))
  end || 0
end

.reregister_model(model_name) ⇒ Object



42
43
44
# File 'lib/undertow/buffer.rb', line 42

def reregister_model(model_name)
  with_redis { |r| r.sadd(Registry::MODELS_KEY, model_name) }
end

.restore_deleted(model_name, ids) ⇒ Object



57
58
59
# File 'lib/undertow/buffer.rb', line 57

def restore_deleted(model_name, ids)
  with_redis { |r| r.sadd(Registry.deleted_key(model_name), ids) } if ids.any?
end

.restore_pending(model_name, ids) ⇒ Object



53
54
55
# File 'lib/undertow/buffer.rb', line 53

def restore_pending(model_name, ids)
  with_redis { |r| r.sadd(Registry.pending_key(model_name), ids) } if ids.any?
end