Module: Legion::Cluster::Lock
- Defined in:
- lib/legion/cluster/lock.rb
Class Method Summary collapse
-
.acquire(name:, ttl: 30, timeout: 5) ⇒ Object
rubocop:disable Lint/UnusedMethodArgument.
- .acquire_postgres(name:) ⇒ Object
- .acquire_redis(name:, ttl:) ⇒ Object
- .backend ⇒ Object
- .delete_token(name) ⇒ Object
- .extend_lock(name:, token: nil, ttl: 30) ⇒ Object
- .extend_lock_redis(name:, token:, ttl:) ⇒ Object
- .fetch_token(name) ⇒ Object
- .lock_key(name) ⇒ Object
- .redis_key(name) ⇒ Object
- .release(name:, token: nil) ⇒ Object
- .release_postgres(name:) ⇒ Object
- .release_redis(name:, token:) ⇒ Object
- .store_token(name, token) ⇒ Object
- .tokens ⇒ Object
- .with_lock(name:, ttl: 30, timeout: 5) ⇒ Object
Class Method Details
.acquire(name:, ttl: 30, timeout: 5) ⇒ Object
rubocop:disable Lint/UnusedMethodArgument
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/legion/cluster/lock.rb', line 37 def acquire(name:, ttl: 30, timeout: 5) # rubocop:disable Lint/UnusedMethodArgument case backend when :redis acquire_redis(name: name, ttl: ttl) when :postgres acquire_postgres(name: name) else false end end |
.acquire_postgres(name:) ⇒ Object
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/legion/cluster/lock.rb', line 127 def acquire_postgres(name:) key = lock_key(name) db = Legion::Data.connection return false unless db db.fetch('SELECT pg_try_advisory_lock(?) AS acquired', key).first[:acquired] rescue StandardError => e Legion::Logging.debug "Lock#acquire_postgres failed for name=#{name}: #{e.}" if defined?(Legion::Logging) false end |
.acquire_redis(name:, ttl:) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/legion/cluster/lock.rb', line 91 def acquire_redis(name:, ttl:) client = Legion::Cache::Redis.client token = SecureRandom.hex(16) key = redis_key(name) result = client.call('SET', key, token, 'NX', 'PX', ttl * 1000) return nil unless result store_token(name, token) token rescue StandardError => e Legion::Logging.debug "Lock#acquire_redis failed for name=#{name}: #{e.}" if defined?(Legion::Logging) nil end |
.backend ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/legion/cluster/lock.rb', line 21 def backend if defined?(Legion::Cache) && Legion::Cache.respond_to?(:const_defined?) && Legion::Cache.const_defined?(:Redis, false) && Legion::Cache::Redis.respond_to?(:client) && !Legion::Cache::Redis.client.nil? :redis elsif defined?(Legion::Data) && Legion::Data.respond_to?(:connection) && !Legion::Data.connection.nil? :postgres else :none end end |
.delete_token(name) ⇒ Object
186 187 188 189 190 191 192 |
# File 'lib/legion/cluster/lock.rb', line 186 def delete_token(name) if defined?(Concurrent::Map) @tokens.delete(name.to_s) else @tokens_mutex.synchronize { @tokens.delete(name.to_s) } end end |
.extend_lock(name:, token: nil, ttl: 30) ⇒ Object
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/legion/cluster/lock.rb', line 59 def extend_lock(name:, token: nil, ttl: 30) case backend when :redis extend_lock_redis(name: name, token: token, ttl: ttl) when :postgres true else false end end |
.extend_lock_redis(name:, token:, ttl:) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/legion/cluster/lock.rb', line 138 def extend_lock_redis(name:, token:, ttl:) tok = token || fetch_token(name) return false unless tok client = Legion::Cache::Redis.client key = redis_key(name) lua = <<~LUA if redis.call('GET', KEYS[1]) == ARGV[1] then redis.call('PEXPIRE', KEYS[1], ARGV[2]) return 1 else return 0 end LUA result = client.call('EVAL', lua, 1, key, tok, (ttl * 1000).to_s) result == 1 rescue StandardError => e Legion::Logging.debug "Lock#extend_lock_redis failed for name=#{name}: #{e.}" if defined?(Legion::Logging) false end |
.fetch_token(name) ⇒ Object
178 179 180 181 182 183 184 |
# File 'lib/legion/cluster/lock.rb', line 178 def fetch_token(name) if defined?(Concurrent::Map) @tokens[name.to_s] else @tokens_mutex.synchronize { @tokens[name.to_s] } end end |
.lock_key(name) ⇒ Object
83 84 85 |
# File 'lib/legion/cluster/lock.rb', line 83 def lock_key(name) name.to_s.bytes.reduce(0) { |acc, b| ((acc * 31) + b) & 0x7FFFFFFF } end |
.redis_key(name) ⇒ Object
87 88 89 |
# File 'lib/legion/cluster/lock.rb', line 87 def redis_key(name) "legion:lock:#{name}" end |
.release(name:, token: nil) ⇒ Object
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/legion/cluster/lock.rb', line 48 def release(name:, token: nil) case backend when :redis release_redis(name: name, token: token) when :postgres release_postgres(name: name) else false end end |
.release_postgres(name:) ⇒ Object
159 160 161 162 163 164 165 166 167 168 |
# File 'lib/legion/cluster/lock.rb', line 159 def release_postgres(name:) key = lock_key(name) db = Legion::Data.connection return false unless db db.fetch('SELECT pg_advisory_unlock(?) AS released', key).first[:released] rescue StandardError => e Legion::Logging.debug "Lock#release_postgres failed for name=#{name}: #{e.}" if defined?(Legion::Logging) false end |
.release_redis(name:, token:) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/legion/cluster/lock.rb', line 105 def release_redis(name:, token:) client = Legion::Cache::Redis.client tok = token || fetch_token(name) return false unless tok key = redis_key(name) lua = <<~LUA if redis.call('GET', KEYS[1]) == ARGV[1] then redis.call('DEL', KEYS[1]) return 1 else return 0 end LUA result = client.call('EVAL', lua, 1, key, tok) delete_token(name) result == 1 rescue StandardError => e Legion::Logging.debug "Lock#release_redis failed for name=#{name}: #{e.}" if defined?(Legion::Logging) false end |
.store_token(name, token) ⇒ Object
170 171 172 173 174 175 176 |
# File 'lib/legion/cluster/lock.rb', line 170 def store_token(name, token) if defined?(Concurrent::Map) @tokens[name.to_s] = token else @tokens_mutex.synchronize { @tokens[name.to_s] = token } end end |
.tokens ⇒ Object
17 18 19 |
# File 'lib/legion/cluster/lock.rb', line 17 def tokens @tokens end |
.with_lock(name:, ttl: 30, timeout: 5) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/cluster/lock.rb', line 70 def with_lock(name:, ttl: 30, timeout: 5) acquired = acquire(name: name, ttl: ttl, timeout: timeout) return unless acquired token = acquired == true ? nil : acquired begin yield ensure release(name: name, token: token) end end |