Module: RaceGuard::Distributed::Runner

Defined in:
lib/race_guard/distributed/runner.rb

Overview

Orchestrates claim / yield / release and reporting. rubocop:disable Metrics/ModuleLength, Metrics/ClassLength – Epic 10 runner rubocop:disable Layout/LineLength, Metrics/ParameterLists, Metrics/MethodLength – Epic 10 runner rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity – Epic 10 runner

Constant Summary collapse

HELD_STACK_KEY =
:__race_guard_distributed_held_stack__

Class Method Summary collapse

Class Method Details

.base_context(event:, lock_name:, lock_key:, resource_digest:, token:, ttl:, caller_line:, store_class:) ⇒ Object



284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/race_guard/distributed/runner.rb', line 284

def base_context(event:, lock_name:, lock_key:, resource_digest:, token:, ttl:, caller_line:, store_class:)
  {
    'event' => event,
    'lock_name' => lock_name,
    'lock_key' => lock_key,
    'resource_hash' => resource_digest,
    'owner_token_hash' => token_hash(token),
    'ttl' => ttl,
    'caller' => caller_line,
    'store_class' => store_class
  }.compact
end

.effective_skip_behavior(on_skip, cfg) ⇒ Object



310
311
312
# File 'lib/race_guard/distributed/runner.rb', line 310

def effective_skip_behavior(on_skip, cfg)
  (on_skip.nil? ? cfg.distributed_skip_behavior : on_skip).to_sym
end

.effective_store(cfg) ⇒ Object



297
298
299
300
# File 'lib/race_guard/distributed/runner.rb', line 297

def effective_store(cfg)
  cfg.distributed_lock_store ||
    (cfg.distributed_redis_client && RedisLockStore.new(cfg.distributed_redis_client))
end

.emit_claimed(lock_name, lock_key, resource_digest, token, ttl, caller_line, store_class) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/race_guard/distributed/runner.rb', line 168

def emit_claimed(lock_name, lock_key, resource_digest, token, ttl, caller_line, store_class)
  emit_lifecycle(
    event: 'claimed',
    lock_name: lock_name,
    lock_key: lock_key,
    resource_digest: resource_digest,
    token: token,
    ttl: ttl,
    caller_line: caller_line,
    store_class: store_class
  )
end

.emit_error(event:, lock_name:, lock_key:, resource_digest:, token:, ttl:, caller_line:, store_class:, error:, extra: {}) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/race_guard/distributed/runner.rb', line 252

def emit_error(event:, lock_name:, lock_key:, resource_digest:, token:, ttl:, caller_line:, store_class:,
               error:, extra: {})
  return unless RaceGuard.configuration.active?

  ctx = base_context(
    event: event,
    lock_name: lock_name,
    lock_key: lock_key,
    resource_digest: resource_digest,
    token: token,
    ttl: ttl,
    caller_line: caller_line,
    store_class: store_class
  )
  ctx.merge!(stringify_keys(extra))
  ctx['error_class'] = error.class.name if error
  ctx['error_message'] = error.message.to_s[0, 500] if error
  sev = RaceGuard.configuration.severity_for(:distributed_guard)
  RaceGuard.report(
    detector: 'distributed_guard',
    message: "distributed_guard:#{event}",
    severity: sev,
    context: ctx
  )
end

.emit_lifecycle(event:, lock_name:, lock_key:, resource_digest:, token:, ttl:, caller_line:, store_class: nil, extra: {}, severity: :info) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/race_guard/distributed/runner.rb', line 229

def emit_lifecycle(event:, lock_name:, lock_key:, resource_digest:, token:, ttl:, caller_line:,
                   store_class: nil, extra: {}, severity: :info)
  return unless RaceGuard.configuration.active?

  ctx = base_context(
    event: event,
    lock_name: lock_name,
    lock_key: lock_key,
    resource_digest: resource_digest,
    token: token,
    ttl: ttl,
    caller_line: caller_line,
    store_class: store_class
  )
  ctx.merge!(stringify_keys(extra))
  RaceGuard.report(
    detector: 'distributed_guard',
    message: "distributed_guard:#{event}",
    severity: severity,
    context: ctx
  )
end

.emit_skipped_lost_race(lock_name, lock_key, resource_digest, token, ttl, caller_line, store_class, on_skip, cfg) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/race_guard/distributed/runner.rb', line 152

def emit_skipped_lost_race(lock_name, lock_key, resource_digest, token, ttl, caller_line, store_class,
                           on_skip, cfg)
  skip_behavior = effective_skip_behavior(on_skip, cfg)
  emit_lifecycle(
    event: 'skipped',
    lock_name: lock_name,
    lock_key: lock_key,
    resource_digest: resource_digest,
    token: token,
    ttl: ttl,
    caller_line: caller_line,
    store_class: store_class,
    extra: { 'skip_reason' => 'lost_race', 'skip_behavior' => skip_behavior.to_s }
  )
end

.emit_skipped_reentrant(lock_name, lock_key, resource_digest, ttl, caller_line, on_skip, cfg) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/race_guard/distributed/runner.rb', line 137

def emit_skipped_reentrant(lock_name, lock_key, resource_digest, ttl, caller_line, on_skip, cfg)
  skip_behavior = effective_skip_behavior(on_skip, cfg)
  emit_lifecycle(
    event: 'skipped',
    lock_name: lock_name,
    lock_key: lock_key,
    resource_digest: resource_digest,
    token: nil,
    ttl: ttl,
    caller_line: caller_line,
    store_class: nil,
    extra: { 'skip_reason' => 'reentrant', 'skip_behavior' => skip_behavior.to_s }
  )
end

.finalize_release(store, lock_key, token, lock_name, resource_digest, ttl, caller_line, store_class) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/race_guard/distributed/runner.rb', line 181

def finalize_release(store, lock_key, token, lock_name, resource_digest, ttl, caller_line, store_class)
  release_error = nil
  released =
    begin
      store.release(key: lock_key, token: token)
    rescue StandardError => e
      release_error = e
      false
    end
  if release_error
    emit_error(
      event: 'release_failed',
      lock_name: lock_name,
      lock_key: lock_key,
      resource_digest: resource_digest,
      token: token,
      ttl: ttl,
      caller_line: caller_line,
      store_class: store_class,
      error: release_error
    )
  elsif released
    emit_lifecycle(
      event: 'released',
      lock_name: lock_name,
      lock_key: lock_key,
      resource_digest: resource_digest,
      token: token,
      ttl: ttl,
      caller_line: caller_line,
      store_class: store_class
    )
  else
    emit_lifecycle(
      event: 'release_failed',
      lock_name: lock_name,
      lock_key: lock_key,
      resource_digest: resource_digest,
      token: token,
      ttl: ttl,
      caller_line: caller_line,
      store_class: store_class,
      extra: { 'note' => 'compare-and-delete did not remove key (expired or stolen)' },
      severity: :warn
    )
  end
end

.handle_skip(_block, on_skip, default_behavior) ⇒ Object



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/race_guard/distributed/runner.rb', line 314

def handle_skip(_block, on_skip, default_behavior)
  behavior = (on_skip.nil? ? default_behavior : on_skip).to_sym
  case behavior
  when :nil
    nil
  when :sentinel
    SKIPPED
  when :raise
    raise LockNotAcquiredError, 'distributed lock not acquired'
  else
    raise ArgumentError,
          "invalid on_skip / distributed_skip_behavior: #{behavior.inspect} " \
          '(expected :nil, :sentinel, or :raise)'
  end
end

.run(name:, ttl:, resource:, on_skip:, &block) ⇒ Object

Raises:

  • (ArgumentError)


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/race_guard/distributed/runner.rb', line 59

def run(name:, ttl:, resource:, on_skip:, &block)
  raise ArgumentError, 'RaceGuard.distributed_once requires a block' unless block

  cfg = RaceGuard.configuration
  lock_name = name.to_s
  caller_line = safe_caller

  return yield_simple(block, nil) if !cfg.enabled?(:distributed_guard) || !cfg.active?

  unless ttl.is_a?(Integer) && ttl.positive?
    raise ArgumentError, 'ttl must be a positive Integer (seconds)'
  end

  prefix = cfg.distributed_key_prefix
  lock_key = KeyBuilder.build(name: lock_name, resource: resource, prefix: prefix)
  resource_digest = KeyBuilder.resource_digest(resource)

  stack = Thread.current[HELD_STACK_KEY] ||= []
  if cfg.distributed_reentrancy == :skip && stack.include?(lock_key)
    emit_skipped_reentrant(
      lock_name, lock_key, resource_digest, ttl, caller_line, on_skip, cfg
    )
    return handle_skip(block, on_skip, cfg.distributed_skip_behavior)
  end

  store = effective_store(cfg)
  if store.nil?
    return run_misconfigured_store(cfg, block, on_skip, lock_name, lock_key, resource_digest, ttl,
                                   caller_line)
  end

  store_class = store.class.name
  token = SecureRandom.hex(16)
  res, payload = try_claim(store, lock_key, token, ttl.to_i)
  if res == :err
    return run_store_error(
      cfg, block, on_skip, lock_name, lock_key, resource_digest, ttl, caller_line, store_class, payload
    )
  end

  claimed = payload
  unless claimed
    emit_skipped_lost_race(
      lock_name, lock_key, resource_digest, token, ttl, caller_line, store_class, on_skip, cfg
    )
    return handle_skip(block, on_skip, cfg.distributed_skip_behavior)
  end

  emit_claimed(lock_name, lock_key, resource_digest, token, ttl, caller_line, store_class)
  stack << lock_key
  control = LockControl.new(
    {
      store: store,
      lock_key: lock_key,
      token: token,
      ttl: ttl.to_i,
      lock_name: lock_name,
      resource_digest: resource_digest,
      caller_line: caller_line,
      store_class: store_class
    }
  )
  begin
    yield_simple(block, control)
  ensure
    stack.pop if stack.last == lock_key
    finalize_release(
      store, lock_key, token, lock_name, resource_digest, ttl, caller_line, store_class
    )
  end
end

.run_misconfigured_store(cfg, block, on_skip, lock_name, lock_key, resource_digest, ttl, caller_line) ⇒ Object



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/race_guard/distributed/runner.rb', line 330

def run_misconfigured_store(cfg, block, on_skip, lock_name, lock_key, resource_digest, ttl, caller_line)
  return yield_simple(block, nil) if cfg.distributed_degrade_silently

  emit_error(
    event: 'configuration_error',
    lock_name: lock_name,
    lock_key: lock_key,
    resource_digest: resource_digest,
    token: nil,
    ttl: ttl,
    caller_line: caller_line,
    store_class: nil,
    error: nil,
    extra: {
      'reason' => 'missing_lock_store',
      'hint' => 'set distributed_lock_store or distributed_redis_client'
    }
  )
  handle_skip(block, on_skip, cfg.distributed_skip_behavior)
end

.run_store_error(cfg, block, on_skip, lock_name, lock_key, resource_digest, ttl, caller_line, store_class, error) ⇒ Object



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/race_guard/distributed/runner.rb', line 351

def run_store_error(cfg, block, on_skip, lock_name, lock_key, resource_digest, ttl, caller_line,
                    store_class, error)
  return yield_simple(block, nil) if cfg.distributed_degrade_silently

  emit_error(
    event: 'redis_error',
    lock_name: lock_name,
    lock_key: lock_key,
    resource_digest: resource_digest,
    token: nil,
    ttl: ttl,
    caller_line: caller_line,
    store_class: store_class,
    error: error
  )
  handle_skip(block, on_skip, cfg.distributed_skip_behavior)
end

.safe_callerObject



369
370
371
372
# File 'lib/race_guard/distributed/runner.rb', line 369

def safe_caller
  loc = caller_locations(2, 1)&.first
  loc ? "#{loc.path}:#{loc.lineno}:in `#{loc.label}'" : ''
end

.stringify_keys(hash) ⇒ Object



374
375
376
# File 'lib/race_guard/distributed/runner.rb', line 374

def stringify_keys(hash)
  hash.to_h { |k, v| [k.to_s, v] }
end

.token_hash(token) ⇒ Object



278
279
280
281
282
# File 'lib/race_guard/distributed/runner.rb', line 278

def token_hash(token)
  return nil unless token

  Digest::SHA256.hexdigest(token.to_s)[0, 32]
end

.try_claim(store, lock_key, token, ttl) ⇒ Object



131
132
133
134
135
# File 'lib/race_guard/distributed/runner.rb', line 131

def try_claim(store, lock_key, token, ttl)
  [:ok, store.claim(key: lock_key, token: token, ttl: ttl)]
rescue StandardError => e
  [:err, e]
end

.yield_simple(block, control) ⇒ Object



302
303
304
305
306
307
308
# File 'lib/race_guard/distributed/runner.rb', line 302

def yield_simple(block, control)
  if block.arity.zero?
    block.call
  else
    block.call(control)
  end
end