Module: RubyReactor::Storage::RedisOrderedLocking

Included in:
RedisAdapter
Defined in:
lib/ruby_reactor/storage/redis_ordered_locking.rb

Overview

Ordered Lock Primitives — used by ‘with_ordered_lock` to enforce strict transaction ordering via a monotonically increasing nonce assigned at enqueue time. See `RubyReactor::OrderedLock`.

Storage layout (hash-tagged so a Redis cluster keeps them on the same shard):

STRING  ordered_lock:{<key>}:next            — last-assigned nonce
STRING  ordered_lock:{<key>}:last_completed  — last-advanced nonce
HASH    ordered_lock:{<key>}:assigned_at     — { nonce => unix_ts }
STRING  ordered_lock:{<key>}:first_failed    — nonce of the FIRST run
                                               whose terminal status
                                               was Failure (strict mode
                                               poison marker; 0 / unset
                                               if no failure yet).
STRING  ordered_lock:{<key>}:epoch            — generation counter,
                                               bumped each time a fresh
                                               batch starts (nonce 1).
                                               Captured at assign and
                                               carried by every gate /
                                               advance call so a stale
                                               straggler from a drained
                                               batch (whose nonce numbers
                                               the next batch reuses)
                                               is fenced out as a no-op.

When ‘last_completed == next`, the next/last_completed/assigned_at/ first_failed keys are garbage-collected by the ADVANCE script and the next assign starts at 1 again — a fresh batch always starts un-poisoned. The `epoch` key is deliberately NOT GC’d (only TTL-expires when fully idle) so the generation keeps incrementing across back-to-back batches.

Constant Summary collapse

ASSIGN_SCRIPT =

rubocop:disable Metrics/ModuleLength

<<~LUA
  local next_key  = KEYS[1]
  local last_key  = KEYS[2]
  local at_key    = KEYS[3]
  local epoch_key = KEYS[4]
  local now       = ARGV[1]
  local ttl       = tonumber(ARGV[2])

  local nonce = redis.call('incr', next_key)
  redis.call('expire', next_key, ttl)

  if redis.call('exists', last_key) == 0 then
    redis.call('set', last_key, 0, 'EX', ttl)
  else
    redis.call('expire', last_key, ttl)
  end

  -- nonce == 1 means `next` was absent (first ever, or GC'd after a full
  -- drain), so this is the start of a fresh batch -> bump the generation.
  -- Later nonces in the same batch read the epoch the nonce-1 caller set.
  local epoch
  if nonce == 1 then
    epoch = redis.call('incr', epoch_key)
  else
    epoch = tonumber(redis.call('get', epoch_key) or '1')
  end
  redis.call('expire', epoch_key, ttl)

  redis.call('hset', at_key, nonce, now)
  redis.call('expire', at_key, ttl)
  return {nonce, epoch}
LUA
CAN_PROCEED_SCRIPT =
<<~LUA
  local next_key  = KEYS[1]
  local last_key  = KEYS[2]
  local at_key    = KEYS[3]
  local fail_key  = KEYS[4]
  local epoch_key = KEYS[5]
  local my        = tonumber(ARGV[1])
  local now       = tonumber(ARGV[2])
  local pp        = tonumber(ARGV[3])
  local my_epoch  = tonumber(ARGV[4] or '0')

  local last = tonumber(redis.call('get', last_key) or '0')
  local first_failed = tonumber(redis.call('get', fail_key) or '0')

  -- Stale-batch fence: a caller carrying an epoch from a drained batch
  -- (whose nonce numbers the current batch reused) must not gate against
  -- or poison-advance this batch. my_epoch == 0 is a legacy/no-epoch
  -- caller (e.g. an in-flight job from before this field existed) — skip.
  local cur_epoch = tonumber(redis.call('get', epoch_key) or '0')
  if my_epoch > 0 and my_epoch ~= cur_epoch then
    return {'stale', 0, last, first_failed}
  end

  -- Drained-batch fence: both counters absent means this caller's batch
  -- fully drained and was GC'd (or wholly TTL-expired) while it slept —
  -- e.g. a poison-passed straggler waking between the drain and the next
  -- batch's first assign (epoch not yet bumped, so the stale fence can't
  -- catch it). It may run late (poison semantics) but must NOT enter the
  -- poison loop below: SET on the missing last_key would resurrect the
  -- cursor with no TTL and let every nonce of the NEXT batch gate
  -- straight through. Only both-absent is conclusive — a mid-batch
  -- next_key TTL hiccup leaves last_key in place and proceeds normally.
  -- Returns a DISTINCT 'drained_go' (not plain 'go') so the executor can
  -- tell this apart: a genuine late straggler should run, but a Sidekiq
  -- at-least-once redelivery of an already-terminal context must NOT
  -- re-execute. The executor consults the stored context status to decide.
  if redis.call('exists', next_key) == 0 and redis.call('exists', last_key) == 0 then
    return {'drained_go', 0, last, first_failed}
  end

  -- Liveness heartbeat: a gate check proves this caller is alive (about
  -- to run, or snoozing on lock/rate contention and re-checking), so
  -- restamp its own assigned_at. assigned_at is otherwise set once at
  -- ENQUEUE, meaning a job that merely sat in a deep queue longer than
  -- poison_pill_timeout would be poison-passed the moment a successor
  -- gates — this restamp gives it a full pp window from the time it
  -- actually starts. hexists guard: never resurrect an entry that an
  -- out-of-order terminal advance already deleted.
  if my > last and redis.call('hexists', at_key, my) == 1 then
    redis.call('hset', at_key, my, now)
  end

  if my <= last then
    return {'go', 0, last, first_failed}
  end

  if my == last + 1 then
    return {'go', 0, last, first_failed}
  end

  -- Drain consecutive poisoned blockers in one shot. Without this loop a
  -- cluster of N dead blockers takes N snooze rounds to clear (one round
  -- per blocker); with it, a single can_proceed call sweeps them all.
  -- Bounded by `my` so the loop runs at most O(stream length) per call.
  local advanced_via_poison = false
  while last + 1 < my do
    local blocker = last + 1
    local at = tonumber(redis.call('hget', at_key, blocker) or '0')
    -- Only a blocker with a recent assigned_at timestamp is genuinely in
    -- flight; stop draining there. `at == 0` means the timer is gone (an
    -- out-of-order advance deleted it, or the assigned_at hash expired),
    -- so the blocker can never make progress on its own — advance past it
    -- rather than stalling forever (the original `break` here was a
    -- permanent head-of-line hang, the exact thing poison-pill prevents).
    if at > 0 and (now - at) <= pp then
      break
    end
    redis.call('set', last_key, blocker, 'KEEPTTL')
    redis.call('hdel', at_key, blocker)
    last = blocker
    advanced_via_poison = true
  end

  if my <= last then
    return {advanced_via_poison and 'poison_advance' or 'go', 0, last, first_failed}
  end

  if my == last + 1 then
    return {advanced_via_poison and 'poison_advance' or 'go', 0, last, first_failed}
  end

  local blocker = last + 1
  local blocker_assigned = tonumber(redis.call('hget', at_key, blocker) or '0')
  local hint
  if blocker_assigned > 0 then
    hint = pp - (now - blocker_assigned)
  else
    hint = pp
  end
  if hint < 1 then hint = 1 end
  return {'wait', hint, last, first_failed}
LUA
ADVANCE_SCRIPT =
<<~LUA
  local next_key  = KEYS[1]
  local last_key  = KEYS[2]
  local at_key    = KEYS[3]
  local fail_key  = KEYS[4]
  local epoch_key = KEYS[5]
  local my        = tonumber(ARGV[1])
  local failed    = tonumber(ARGV[2]) == 1
  local ttl       = tonumber(ARGV[3])
  local my_epoch  = tonumber(ARGV[4] or '0')

  -- Stale-batch fence: an advance carrying an epoch from a drained batch
  -- whose nonce numbers were reused must not mutate the current batch's
  -- counters. This is the core protection against a slow straggler from a
  -- prior batch corrupting a later one. my_epoch == 0 = legacy caller.
  local cur_epoch = tonumber(redis.call('get', epoch_key) or '0')
  if my_epoch > 0 and my_epoch ~= cur_epoch then
    return tonumber(redis.call('get', last_key) or '0')
  end

  -- Drained-batch fence (mirrors CAN_PROCEED): a late terminal from a
  -- batch that already drained and GC'd must be a complete no-op. Without
  -- it, an in-order straggler advance (my == 0 + 1) would SET last_key
  -- with KEEPTTL on a missing key — resurrecting a TTL-less cursor that
  -- un-gates every nonce of the next batch — and a failed straggler
  -- would write fail_key, strict-poisoning a batch that hasn't started.
  if redis.call('exists', next_key) == 0 and redis.call('exists', last_key) == 0 then
    return 0
  end

  local last = tonumber(redis.call('get', last_key) or '0')

  -- Record the chain poison marker for ANY terminal failure ahead of the
  -- cursor (my > last), not just the in-order successor; strict-mode
  -- chain-skip relies on the SMALLEST failed nonce being recorded. A
  -- failure at or behind the cursor (my <= last) is deliberately NOT
  -- recorded: that guard is what keeps a Sidekiq duplicate redelivery of
  -- an already-terminated failure from re-poisoning a chain that moved
  -- on. Cost of the trade-off: a run the poison-advance already passed
  -- (cursor moved beyond it) that later fails does NOT poison the chain
  -- — by then ordering was already ceded for that nonce and successors
  -- may have run.
  if failed and my > last then
    local existing = tonumber(redis.call('get', fail_key) or '0')
    if existing == 0 or my < existing then
      redis.call('set', fail_key, my, 'EX', ttl)
    end
  end

  if my == last + 1 then
    redis.call('set', last_key, my, 'KEEPTTL')
    redis.call('hdel', at_key, my)
    last = my

    local nxt = tonumber(redis.call('get', next_key) or '0')
    -- Guard with `nxt > 0` (mirroring SKIP_SCRIPT): a missing/expired
    -- next_key reads as 0 and `last >= 0` would otherwise GC live
    -- counters mid-sequence, resetting numbering and dropping the marker.
    if last >= nxt and nxt > 0 then
      redis.call('del', next_key)
      redis.call('del', last_key)
      redis.call('del', at_key)
      redis.call('del', fail_key)
    end
    return last
  end

  redis.call('hdel', at_key, my)
  return last
LUA
HEARTBEAT_SCRIPT =

Liveness restamp for a nonce that is actively executing its steps. The CAN_PROCEED heartbeat only fires when a job runs its gate; a blocker that passed the gate and is now running long steps never re-gates, so without this a successor would poison-advance past a still-running blocker once its steps outlast poison_pill_timeout — a silent ordering violation. A background thread calls this every pp/3 seconds while steps run.

Guards: epoch-fenced (a stale-batch straggler must not touch the current batch) and hexists-guarded (never resurrect a timer a terminal advance already deleted — if we were already poison-passed, stay passed).

<<~LUA
  local at_key    = KEYS[1]
  local epoch_key = KEYS[2]
  local my        = ARGV[1]
  local now       = ARGV[2]
  local my_epoch  = tonumber(ARGV[3] or '0')

  local cur_epoch = tonumber(redis.call('get', epoch_key) or '0')
  if my_epoch > 0 and my_epoch ~= cur_epoch then
    return 0
  end

  if redis.call('hexists', at_key, my) == 1 then
    redis.call('hset', at_key, my, now)
    return 1
  end
  return 0
LUA
SKIP_SCRIPT =
<<~LUA
  local next_key = KEYS[1]
  local last_key = KEYS[2]
  local at_key   = KEYS[3]
  local fail_key = KEYS[4]
  local my       = tonumber(ARGV[1])

  -- Drained-batch fence (mirrors CAN_PROCEED/ADVANCE): an ops `skip!` of a
  -- nonce whose batch already drained must not SET last_key on a missing
  -- key — KEEPTTL on an absent key would create a TTL-less cursor and
  -- un-gate the next batch. Nothing to skip in a drained batch anyway.
  if redis.call('exists', next_key) == 0 and redis.call('exists', last_key) == 0 then
    return 0
  end

  local last = tonumber(redis.call('get', last_key) or '0')

  if my > last then
    -- KEEPTTL: forcing the cursor forward must not strip the sequence TTL
    -- and leave a persistent key behind for a sequence that never drains.
    redis.call('set', last_key, my, 'KEEPTTL')
  end
  redis.call('hdel', at_key, my)

  last = tonumber(redis.call('get', last_key) or '0')
  local nxt = tonumber(redis.call('get', next_key) or '0')
  if last >= nxt and nxt > 0 then
    redis.call('del', next_key)
    redis.call('del', last_key)
    redis.call('del', at_key)
    redis.call('del', fail_key)
  end
  return last
LUA

Instance Method Summary collapse

Instance Method Details

#ordered_lock_advance(key, nonce:, failed: false, epoch: 0, ttl: 86_400) ⇒ Object



327
328
329
330
331
332
333
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 327

def ordered_lock_advance(key, nonce:, failed: false, epoch: 0, ttl: 86_400)
  @redis.eval(
    ADVANCE_SCRIPT,
    keys: ordered_lock_keys(key),
    argv: [nonce.to_i, failed ? 1 : 0, ttl.to_i, epoch.to_i]
  ).to_i
end

#ordered_lock_assign(key, ttl: 86_400, now: Time.now.to_i) ⇒ Object

Returns ‘[nonce, epoch]`: the assigned nonce plus the generation it belongs to. The caller stashes both so later gate/advance calls can be fenced if the batch drains and its numbers get reused.



310
311
312
313
314
315
316
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 310

def ordered_lock_assign(key, ttl: 86_400, now: Time.now.to_i)
  next_k, last_k, at_k, _fail_k, epoch_k = ordered_lock_keys(key)
  nonce, epoch = @redis.eval(
    ASSIGN_SCRIPT, keys: [next_k, last_k, at_k, epoch_k], argv: [now.to_s, ttl]
  )
  [nonce.to_i, epoch.to_i]
end

#ordered_lock_can_proceed(key, nonce:, poison_pill_timeout:, epoch: 0, now: Time.now.to_i) ⇒ Object



318
319
320
321
322
323
324
325
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 318

def ordered_lock_can_proceed(key, nonce:, poison_pill_timeout:, epoch: 0, now: Time.now.to_i)
  state, retry_after, last_completed, first_failed = @redis.eval(
    CAN_PROCEED_SCRIPT,
    keys: ordered_lock_keys(key),
    argv: [nonce.to_i, now.to_i, poison_pill_timeout.to_i, epoch.to_i]
  )
  [state.to_s, retry_after.to_i, last_completed.to_i, first_failed.to_i]
end

#ordered_lock_heartbeat(key, nonce:, epoch: 0, now: Time.now.to_i) ⇒ Object

Restamp a running nonce’s assigned_at to keep it from being poison-passed while its steps execute. Returns 1 if restamped, 0 if fenced (stale epoch) or the timer was already gone. ‘now` is injectable for testing.



342
343
344
345
346
347
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 342

def ordered_lock_heartbeat(key, nonce:, epoch: 0, now: Time.now.to_i)
  _next_k, _last_k, at_k, _fail_k, epoch_k = ordered_lock_keys(key)
  @redis.eval(
    HEARTBEAT_SCRIPT, keys: [at_k, epoch_k], argv: [nonce.to_i, now.to_i, epoch.to_i]
  ).to_i
end

#ordered_lock_keys(key) ⇒ Object

Order matters: callers destructure positionally and some scripts take a subset. ‘epoch` is appended last so existing 4-key destructures keep working unchanged.



370
371
372
373
374
375
376
377
378
379
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 370

def ordered_lock_keys(key)
  tag = "{#{key}}"
  [
    "ordered_lock:#{tag}:next",
    "ordered_lock:#{tag}:last_completed",
    "ordered_lock:#{tag}:assigned_at",
    "ordered_lock:#{tag}:first_failed",
    "ordered_lock:#{tag}:epoch"
  ]
end

#ordered_lock_peek(key) ⇒ Object



353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 353

def ordered_lock_peek(key)
  next_k, last_k, at_k, fail_k = ordered_lock_keys(key)
  next_v = @redis.get(next_k)
  last_v = @redis.get(last_k)
  fail_v = @redis.get(fail_k)
  in_flight = @redis.hkeys(at_k).map(&:to_i).sort
  {
    next: next_v ? next_v.to_i : 0,
    last_completed: last_v ? last_v.to_i : 0,
    in_flight: in_flight,
    first_failed: fail_v ? fail_v.to_i : 0
  }
end

#ordered_lock_reset(key) ⇒ Object



349
350
351
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 349

def ordered_lock_reset(key)
  @redis.del(*ordered_lock_keys(key))
end

#ordered_lock_skip(key, nonce:) ⇒ Object



335
336
337
# File 'lib/ruby_reactor/storage/redis_ordered_locking.rb', line 335

def ordered_lock_skip(key, nonce:)
  @redis.eval(SKIP_SCRIPT, keys: ordered_lock_keys(key), argv: [nonce.to_i]).to_i
end