Class: Kaal::Backend::RedisAdapter

Inherits:
Adapter
  • Object
show all
Includes:
DispatchLogging
Defined in:
lib/kaal/backend/redis_adapter.rb

Overview

Distributed backend adapter using Redis.

This adapter uses Redis SET command with NX (only set if not exists) and PX (expire in milliseconds) options to implement atomic lock acquisition with automatic TTL-based expiration.

The lock value is a unique identifier (UUID) to allow safe release by preventing deletion of locks acquired by other processes.

Examples:

Using the Redis adapter

redis = Redis.new(url: ENV["REDIS_URL"])
Kaal.configure do |config|
  config.backend = Kaal::Backend::RedisAdapter.new(redis)
  config.enable_log_dispatch_registry = true  # Enable dispatch logging
end

Instance Method Summary collapse

Methods included from DispatchLogging

#log_dispatch_attempt, #parse_lock_key, parse_lock_key

Methods inherited from Adapter

#with_lock

Constructor Details

#initialize(redis, namespace: 'kaal') ⇒ RedisAdapter

Initialize a new Redis adapter.

Parameters:

  • redis (Object)

    a Redis-compatible client instance

  • namespace (String) (defaults to: 'kaal')

    namespace prefix for dispatch registry keys

Raises:

  • (ArgumentError)

    if redis is not provided or does not implement the required interface



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/kaal/backend/redis_adapter.rb', line 38

def initialize(redis, namespace: 'kaal')
  super()
  raise ArgumentError, 'redis client must respond to :set and :eval' unless redis.respond_to?(:set) && redis.respond_to?(:eval)

  @redis = redis
  @namespace = namespace
  @lock_value_generator = -> { SecureRandom.uuid }
  # Store lock values with expiration timestamps to enable safe release and prevent unbounded memory growth.
  # Since lock keys include fire_time.to_i, each dispatch creates a unique key. In the coordinator's
  # normal flow, release is never called (TTL is relied upon), so we must expire local entries.
  @lock_values = {}
  @mutex = Mutex.new
end

Instance Method Details

#acquire(key, ttl) ⇒ Boolean

Attempt to acquire a distributed lock in Redis.

Uses SET key value NX PX ttl to atomically acquire the lock with TTL. Stores the lock value locally with an expiration time to enable safe release while preventing unbounded memory growth.

Parameters:

  • key (String)

    the lock key

  • ttl (Integer)

    time-to-live in seconds

Returns:

  • (Boolean)

    true if acquired, false if held by another process



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/kaal/backend/redis_adapter.rb', line 78

def acquire(key, ttl)
  lock_value = generate_lock_value
  ttl_ms = ttl * 1000

  # SET key value NX PX ttl returns OK if set, nil if not set
  result = @redis.set(key, lock_value, nx: true, px: ttl_ms)

  acquired = ['OK', true].include?(result)

  if acquired
    @mutex.synchronize do
      @lock_values[key] = { value: lock_value, expires_at: Time.now.utc + ttl }
      prune_expired_lock_values
    end
  end

  log_dispatch_attempt(key) if acquired

  acquired
rescue StandardError => e
  raise LockAdapterError, "Redis acquire failed for #{key}: #{e.message}"
end

#definition_registryKaal::Definition::RedisEngine

Get the definition registry for Redis-backed definition persistence.

Returns:



64
65
66
# File 'lib/kaal/backend/redis_adapter.rb', line 64

def definition_registry
  @definition_registry ||= Kaal::Definition::RedisEngine.new(@redis, namespace: @namespace)
end

#dispatch_registryKaal::Dispatch::RedisEngine

Get the dispatch registry for Redis logging.

Returns:



56
57
58
# File 'lib/kaal/backend/redis_adapter.rb', line 56

def dispatch_registry
  @dispatch_registry ||= Kaal::Dispatch::RedisEngine.new(@redis, namespace: @namespace)
end

#release(key) ⇒ Boolean

Release a distributed lock from Redis.

Safely deletes the lock only if the stored value matches the value we set during acquire. This prevents releasing locks acquired by other processes.

Parameters:

  • key (String)

    the lock key to release

Returns:

  • (Boolean)

    true if released (key was held with our value), false otherwise



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/kaal/backend/redis_adapter.rb', line 109

def release(key)
  lock_entry = @mutex.synchronize do
    @lock_values.delete(key)
  end

  return false unless lock_entry

  lock_value = lock_entry[:value]

  # Use a Lua script to delete only if value matches
  script = <<~LUA
    if redis.call('get', KEYS[1]) == ARGV[1] then
      return redis.call('del', KEYS[1])
    else
      return 0
    end
  LUA

  result = @redis.eval(script, keys: [key], argv: [lock_value])
  [1, '1', true].include?(result)
rescue StandardError => e
  raise LockAdapterError, "Redis release failed for #{key}: #{e.message}"
end