Class: Kaal::Backend::RedisAdapter
- Includes:
- DispatchLogging
- Defined in:
- lib/kaal/backend/redis_adapter.rb
Overview
Distributed backend adapter using Redis.
This adapter uses Redis SET command with NX (only set if not exists) and PX (expire in milliseconds) options to implement atomic lock acquisition with automatic TTL-based expiration.
The lock value is a unique identifier (UUID) to allow safe release by preventing deletion of locks acquired by other processes.
Instance Method Summary collapse
-
#acquire(key, ttl) ⇒ Boolean
Attempt to acquire a distributed lock in Redis.
-
#definition_registry ⇒ Kaal::Definition::RedisEngine
Get the definition registry for Redis-backed definition persistence.
-
#dispatch_registry ⇒ Kaal::Dispatch::RedisEngine
Get the dispatch registry for Redis logging.
-
#initialize(redis, namespace: 'kaal') ⇒ RedisAdapter
constructor
Initialize a new Redis adapter.
-
#release(key) ⇒ Boolean
Release a distributed lock from Redis.
Methods included from DispatchLogging
#log_dispatch_attempt, #parse_lock_key, parse_lock_key
Methods inherited from Adapter
Constructor Details
#initialize(redis, namespace: 'kaal') ⇒ RedisAdapter
Initialize a new Redis adapter.
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/kaal/backend/redis_adapter.rb', line 38 def initialize(redis, namespace: 'kaal') super() raise ArgumentError, 'redis client must respond to :set and :eval' unless redis.respond_to?(:set) && redis.respond_to?(:eval) @redis = redis @namespace = namespace @lock_value_generator = -> { SecureRandom.uuid } # Store lock values with expiration timestamps to enable safe release and prevent unbounded memory growth. # Since lock keys include fire_time.to_i, each dispatch creates a unique key. In the coordinator's # normal flow, release is never called (TTL is relied upon), so we must expire local entries. @lock_values = {} @mutex = Mutex.new end |
Instance Method Details
#acquire(key, ttl) ⇒ Boolean
Attempt to acquire a distributed lock in Redis.
Uses SET key value NX PX ttl to atomically acquire the lock with TTL. Stores the lock value locally with an expiration time to enable safe release while preventing unbounded memory growth.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/kaal/backend/redis_adapter.rb', line 78 def acquire(key, ttl) lock_value = generate_lock_value ttl_ms = ttl * 1000 # SET key value NX PX ttl returns OK if set, nil if not set result = @redis.set(key, lock_value, nx: true, px: ttl_ms) acquired = ['OK', true].include?(result) if acquired @mutex.synchronize do @lock_values[key] = { value: lock_value, expires_at: Time.now.utc + ttl } prune_expired_lock_values end end log_dispatch_attempt(key) if acquired acquired rescue StandardError => e raise LockAdapterError, "Redis acquire failed for #{key}: #{e.}" end |
#definition_registry ⇒ Kaal::Definition::RedisEngine
Get the definition registry for Redis-backed definition persistence.
64 65 66 |
# File 'lib/kaal/backend/redis_adapter.rb', line 64 def definition_registry @definition_registry ||= Kaal::Definition::RedisEngine.new(@redis, namespace: @namespace) end |
#dispatch_registry ⇒ Kaal::Dispatch::RedisEngine
Get the dispatch registry for Redis logging.
56 57 58 |
# File 'lib/kaal/backend/redis_adapter.rb', line 56 def dispatch_registry @dispatch_registry ||= Kaal::Dispatch::RedisEngine.new(@redis, namespace: @namespace) end |
#release(key) ⇒ Boolean
Release a distributed lock from Redis.
Safely deletes the lock only if the stored value matches the value we set during acquire. This prevents releasing locks acquired by other processes.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/kaal/backend/redis_adapter.rb', line 109 def release(key) lock_entry = @mutex.synchronize do @lock_values.delete(key) end return false unless lock_entry lock_value = lock_entry[:value] # Use a Lua script to delete only if value matches script = <<~LUA if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end LUA result = @redis.eval(script, keys: [key], argv: [lock_value]) [1, '1', true].include?(result) rescue StandardError => e raise LockAdapterError, "Redis release failed for #{key}: #{e.}" end |