Module: RubyReactor::Storage::RedisLocking

Included in:
RedisAdapter
Defined in:
lib/ruby_reactor/storage/redis_locking.rb

Overview

Adapter contract uses non-‘?` names for methods that return booleans (`lock_acquire`, `semaphore_release`, etc). Renaming would break the public storage adapter API, so silence the predicate-name cop here. rubocop:disable Naming/PredicateMethod

Constant Summary collapse

LOCK_ACQUIRE_SCRIPT =

Scripts for Lock Primitives

<<~LUA
  local key = KEYS[1]
  local owner = ARGV[1]
  local ttl = tonumber(ARGV[2])

  if redis.call('exists', key) == 0 then
    redis.call('hset', key, 'owner', owner)
    redis.call('hset', key, 'count', 1)
    redis.call('expire', key, ttl)
    return 1
  elseif redis.call('hget', key, 'owner') == owner then
    redis.call('hincrby', key, 'count', 1)
    redis.call('expire', key, ttl)
    return 1
  else
    return 0
  end
LUA
LOCK_RELEASE_SCRIPT =
<<~LUA
  local key = KEYS[1]
  local owner = ARGV[1]

  if redis.call('hget', key, 'owner') == owner then
    local new_count = redis.call('hincrby', key, 'count', -1)
    if new_count <= 0 then
      redis.call('del', key)
    end
    return 1
  else
    return 0
  end
LUA
LOCK_EXTEND_SCRIPT =
<<~LUA
  local key = KEYS[1]
  local owner = ARGV[1]
  local ttl = tonumber(ARGV[2])

  if redis.call('hget', key, 'owner') == owner then
    redis.call('expire', key, ttl)
    return 1
  else
    return 0
  end
LUA
SEM_ACQUIRE_SCRIPT =

Semaphore Primitives

Storage layout:

LIST   <key>       — available token UUIDs
SET    <key>:held  — tokens currently checked out
STRING <key>:init  — sentinel marking that init has run; value = limit

Tokens are unique UUIDs so release can verify the caller actually holds one before pushing it back, blocking double-release and over-cap RPUSH.

<<~LUA
  local list_key = KEYS[1]
  local held_key = KEYS[2]
  local token = redis.call('lpop', list_key)
  if not token then return false end
  redis.call('sadd', held_key, token)
  return token
LUA
SEM_RELEASE_SCRIPT =
<<~LUA
  local list_key = KEYS[1]
  local held_key = KEYS[2]
  local limit = tonumber(ARGV[1])
  local token = ARGV[2]
  if redis.call('srem', held_key, token) == 0 then
    return 0
  end
  if redis.call('llen', list_key) >= limit then
    return 0
  end
  redis.call('rpush', list_key, token)
  return 1
LUA
SEMAPHORE_TTL =
86_400
RATE_LIMIT_SCRIPT =

Rate Limit Primitives — fixed-window counter, supports multiple windows in one atomic call. Two-pass inside Lua so a miss on the Nth window does not leave the previous N-1 incremented.

ARGV layout: [now, period_1, limit_1, ttl_1, period_2, limit_2, ttl_2, …] KEYS: [bucket_key_1, bucket_key_2, …] Returns: [allowed (1|0), retry_after_seconds, failed_index]

<<~LUA
  local now = tonumber(ARGV[1])
  local n = #KEYS

  for i = 1, n do
    local base = 2 + (i - 1) * 3
    local period = tonumber(ARGV[base])
    local lim = tonumber(ARGV[base + 1])
    local count = tonumber(redis.call('get', KEYS[i]) or '0')
    if count >= lim then
      local retry_after = period - (now % period)
      if retry_after <= 0 then retry_after = 1 end
      return {0, retry_after, i}
    end
  end

  for i = 1, n do
    local base = 2 + (i - 1) * 3
    local ttl = tonumber(ARGV[base + 2])
    local count = redis.call('incr', KEYS[i])
    if count == 1 then
      redis.call('expire', KEYS[i], ttl)
    end
  end

  return {1, 0, 0}
LUA

Instance Method Summary collapse

Instance Method Details

#lock_acquire(key, owner, ttl) ⇒ Object

Lock Primitives



60
61
62
63
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 60

def lock_acquire(key, owner, ttl)
  result = @redis.eval(LOCK_ACQUIRE_SCRIPT, keys: [key], argv: [owner, ttl])
  result == 1
end

#lock_extend(key, owner, ttl) ⇒ Object



70
71
72
73
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 70

def lock_extend(key, owner, ttl)
  result = @redis.eval(LOCK_EXTEND_SCRIPT, keys: [key], argv: [owner, ttl])
  result == 1
end

#lock_info(prefixed_key) ⇒ Object

Returns { owner:, count: } for a held lock, or nil if free. ‘prefixed_key` is the full key (e.g. “lock:order:42”).



215
216
217
218
219
220
221
222
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 215

def lock_info(prefixed_key)
  return nil unless @redis.exists?(prefixed_key)

  data = @redis.hgetall(prefixed_key)
  return nil if data.empty?

  { owner: data["owner"], count: data["count"].to_i }
end

#lock_release(key, owner) ⇒ Object



65
66
67
68
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 65

def lock_release(key, owner)
  result = @redis.eval(LOCK_RELEASE_SCRIPT, keys: [key], argv: [owner])
  result == 1
end

#period_mark(key, ttl) ⇒ Object



165
166
167
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 165

def period_mark(key, ttl)
  @redis.set(key, "1", ex: ttl)
end

#period_marker?(key_base, every, now: Time.now.utc) ⇒ Boolean

Has a period bucket been marked? ‘key_base` is the user’s ‘with_period` key, `every` is the period.

Returns:

  • (Boolean)


245
246
247
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 245

def period_marker?(key_base, every, now: Time.now.utc)
  @redis.exists?(RubyReactor::Period.key(key_base, every, now: now))
end

#period_seen?(key) ⇒ Boolean

Period Primitives — used by ‘with_period` to dedup bucketed runs.

Returns:

  • (Boolean)


161
162
163
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 161

def period_seen?(key)
  @redis.exists?(key)
end

#rate_limit_check_and_increment(keys, argv) ⇒ Object



205
206
207
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 205

def rate_limit_check_and_increment(keys, argv)
  @redis.eval(RATE_LIMIT_SCRIPT, keys: keys, argv: argv)
end

#rate_limit_count(key_base, every, now: Time.now.to_i) ⇒ Object

Current count for a rate-limit bucket. ‘key_base` is the user’s ‘with_rate_limit` key, `every` is the period (symbol or seconds).



237
238
239
240
241
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 237

def rate_limit_count(key_base, every, now: Time.now.to_i)
  period_seconds = RubyReactor::Period.period_seconds(every)
  bucket = now / period_seconds
  @redis.get("rate:#{key_base}:#{every}:#{bucket}").to_i
end

#semaphore_acquire(key, timeout: 0) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 126

def semaphore_acquire(key, timeout: 0)
  held_key = "#{key}:held"

  token = if timeout.to_f.positive?
            result = @redis.blpop(key, timeout: timeout)
            next_token = result&.last
            @redis.sadd(held_key, next_token) if next_token
            next_token
          else
            @redis.eval(SEM_ACQUIRE_SCRIPT, keys: [key, held_key], argv: [])
          end

  return nil unless token

  @redis.expire(held_key, SEMAPHORE_TTL)
  token
end

#semaphore_exists?(key) ⇒ Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 155

def semaphore_exists?(key)
  @redis.exists?("#{key}:init")
end

#semaphore_init(key, limit) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 111

def semaphore_init(key, limit)
  return false unless @redis.set("#{key}:init", limit, nx: true, ex: SEMAPHORE_TTL)

  tokens = Array.new(limit) { SecureRandom.uuid }
  @redis.rpush(key, tokens)
  @redis.expire(key, SEMAPHORE_TTL)
  true
end

#semaphore_release(key, token, limit) ⇒ Object



144
145
146
147
148
149
150
151
152
153
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 144

def semaphore_release(key, token, limit)
  return false unless token

  result = @redis.eval(
    SEM_RELEASE_SCRIPT,
    keys: [key, "#{key}:held"],
    argv: [limit, token]
  )
  result == 1
end

#semaphore_reset(key) ⇒ Object



120
121
122
123
124
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 120

def semaphore_reset(key)
  @redis.del(key)
  @redis.del("#{key}:held")
  @redis.del("#{key}:init")
end

#semaphore_state(name) ⇒ Object

Returns { available:, held:, limit: } for a semaphore. ‘name` is the user-provided semaphore key (without the “semaphore:” prefix).



226
227
228
229
230
231
232
233
# File 'lib/ruby_reactor/storage/redis_locking.rb', line 226

def semaphore_state(name)
  prefix = "semaphore:#{name}"
  {
    available: @redis.llen(prefix),
    held: @redis.scard("#{prefix}:held"),
    limit: @redis.get("#{prefix}:init").to_i
  }
end