Module: Legion::Cluster::Lock

Defined in:
lib/legion/cluster/lock.rb

Class Method Summary collapse

Class Method Details

.acquire(name:, ttl: 30, timeout: 5) ⇒ Object

rubocop:disable Lint/UnusedMethodArgument



37
38
39
40
41
42
43
44
45
46
# File 'lib/legion/cluster/lock.rb', line 37

def acquire(name:, ttl: 30, timeout: 5) # rubocop:disable Lint/UnusedMethodArgument
  case backend
  when :redis
    acquire_redis(name: name, ttl: ttl)
  when :postgres
    acquire_postgres(name: name)
  else
    false
  end
end

.acquire_postgres(name:) ⇒ Object



127
128
129
130
131
132
133
134
135
136
# File 'lib/legion/cluster/lock.rb', line 127

def acquire_postgres(name:)
  key = lock_key(name)
  db = Legion::Data.connection
  return false unless db

  db.fetch('SELECT pg_try_advisory_lock(?) AS acquired', key).first[:acquired]
rescue StandardError => e
  Legion::Logging.debug "Lock#acquire_postgres failed for name=#{name}: #{e.message}" if defined?(Legion::Logging)
  false
end

.acquire_redis(name:, ttl:) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/legion/cluster/lock.rb', line 91

def acquire_redis(name:, ttl:)
  client = Legion::Cache::Redis.client
  token = SecureRandom.hex(16)
  key = redis_key(name)
  result = client.call('SET', key, token, 'NX', 'PX', ttl * 1000)
  return nil unless result

  store_token(name, token)
  token
rescue StandardError => e
  Legion::Logging.debug "Lock#acquire_redis failed for name=#{name}: #{e.message}" if defined?(Legion::Logging)
  nil
end

.backendObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/legion/cluster/lock.rb', line 21

def backend
  if defined?(Legion::Cache) &&
     Legion::Cache.respond_to?(:const_defined?) &&
     Legion::Cache.const_defined?(:Redis, false) &&
     Legion::Cache::Redis.respond_to?(:client) &&
     !Legion::Cache::Redis.client.nil?
    :redis
  elsif defined?(Legion::Data) &&
        Legion::Data.respond_to?(:connection) &&
        !Legion::Data.connection.nil?
    :postgres
  else
    :none
  end
end

.delete_token(name) ⇒ Object



186
187
188
189
190
191
192
# File 'lib/legion/cluster/lock.rb', line 186

def delete_token(name)
  if defined?(Concurrent::Map)
    @tokens.delete(name.to_s)
  else
    @tokens_mutex.synchronize { @tokens.delete(name.to_s) }
  end
end

.extend_lock(name:, token: nil, ttl: 30) ⇒ Object



59
60
61
62
63
64
65
66
67
68
# File 'lib/legion/cluster/lock.rb', line 59

def extend_lock(name:, token: nil, ttl: 30)
  case backend
  when :redis
    extend_lock_redis(name: name, token: token, ttl: ttl)
  when :postgres
    true
  else
    false
  end
end

.extend_lock_redis(name:, token:, ttl:) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/legion/cluster/lock.rb', line 138

def extend_lock_redis(name:, token:, ttl:)
  tok = token || fetch_token(name)
  return false unless tok

  client = Legion::Cache::Redis.client
  key = redis_key(name)
  lua = <<~LUA
    if redis.call('GET', KEYS[1]) == ARGV[1] then
      redis.call('PEXPIRE', KEYS[1], ARGV[2])
      return 1
    else
      return 0
    end
  LUA
  result = client.call('EVAL', lua, 1, key, tok, (ttl * 1000).to_s)
  result == 1
rescue StandardError => e
  Legion::Logging.debug "Lock#extend_lock_redis failed for name=#{name}: #{e.message}" if defined?(Legion::Logging)
  false
end

.fetch_token(name) ⇒ Object



178
179
180
181
182
183
184
# File 'lib/legion/cluster/lock.rb', line 178

def fetch_token(name)
  if defined?(Concurrent::Map)
    @tokens[name.to_s]
  else
    @tokens_mutex.synchronize { @tokens[name.to_s] }
  end
end

.lock_key(name) ⇒ Object



83
84
85
# File 'lib/legion/cluster/lock.rb', line 83

def lock_key(name)
  name.to_s.bytes.reduce(0) { |acc, b| ((acc * 31) + b) & 0x7FFFFFFF }
end

.redis_key(name) ⇒ Object



87
88
89
# File 'lib/legion/cluster/lock.rb', line 87

def redis_key(name)
  "legion:lock:#{name}"
end

.release(name:, token: nil) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/legion/cluster/lock.rb', line 48

def release(name:, token: nil)
  case backend
  when :redis
    release_redis(name: name, token: token)
  when :postgres
    release_postgres(name: name)
  else
    false
  end
end

.release_postgres(name:) ⇒ Object



159
160
161
162
163
164
165
166
167
168
# File 'lib/legion/cluster/lock.rb', line 159

def release_postgres(name:)
  key = lock_key(name)
  db = Legion::Data.connection
  return false unless db

  db.fetch('SELECT pg_advisory_unlock(?) AS released', key).first[:released]
rescue StandardError => e
  Legion::Logging.debug "Lock#release_postgres failed for name=#{name}: #{e.message}" if defined?(Legion::Logging)
  false
end

.release_redis(name:, token:) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/legion/cluster/lock.rb', line 105

def release_redis(name:, token:)
  client = Legion::Cache::Redis.client
  tok = token || fetch_token(name)
  return false unless tok

  key = redis_key(name)
  lua = <<~LUA
    if redis.call('GET', KEYS[1]) == ARGV[1] then
      redis.call('DEL', KEYS[1])
      return 1
    else
      return 0
    end
  LUA
  result = client.call('EVAL', lua, 1, key, tok)
  delete_token(name)
  result == 1
rescue StandardError => e
  Legion::Logging.debug "Lock#release_redis failed for name=#{name}: #{e.message}" if defined?(Legion::Logging)
  false
end

.store_token(name, token) ⇒ Object



170
171
172
173
174
175
176
# File 'lib/legion/cluster/lock.rb', line 170

def store_token(name, token)
  if defined?(Concurrent::Map)
    @tokens[name.to_s] = token
  else
    @tokens_mutex.synchronize { @tokens[name.to_s] = token }
  end
end

.tokensObject



17
18
19
# File 'lib/legion/cluster/lock.rb', line 17

def tokens
  @tokens
end

.with_lock(name:, ttl: 30, timeout: 5) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/legion/cluster/lock.rb', line 70

def with_lock(name:, ttl: 30, timeout: 5)
  acquired = acquire(name: name, ttl: ttl, timeout: timeout)
  return unless acquired

  token = acquired == true ? nil : acquired

  begin
    yield
  ensure
    release(name: name, token: token)
  end
end