Class: RubyReactor::OrderedLock

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/ordered_lock.rb

Overview

Strict-ordering primitive. A monotonically increasing nonce is assigned at enqueue time; the worker can proceed only when its nonce equals ‘last_completed + 1`. Otherwise the worker raises WaitError, which the Sidekiq worker rescues and re-snoozes via `perform_in`.

See ‘with_ordered_lock` for usage from a reactor.

Defined Under Namespace

Classes: WaitError

Constant Summary collapse

DEFAULT_POISON_PILL_TIMEOUT =

Default poison-pill: if the blocker nonce was assigned more than ‘poison_pill_timeout` seconds ago and never advanced, the gate treats it as dead and advances past it. Prevents permanent head-of-line blocking from a crashed caller that INCRed but never enqueued.

600
DEFAULT_TTL =

TTL on the Redis counter keys. Bumped on every assign so an active sequence never expires; only fully-drained ones GC themselves.

86_400

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(key, nonce: nil, epoch: nil, poison_pill_timeout: DEFAULT_POISON_PILL_TIMEOUT, ttl: DEFAULT_TTL, strict: true) ⇒ OrderedLock

rubocop:disable Metrics/ParameterLists



38
39
40
41
42
43
44
45
46
# File 'lib/ruby_reactor/ordered_lock.rb', line 38

def initialize(key, nonce: nil, epoch: nil, poison_pill_timeout: DEFAULT_POISON_PILL_TIMEOUT, # rubocop:disable Metrics/ParameterLists
               ttl: DEFAULT_TTL, strict: true)
  @key = key
  @nonce = nonce
  @epoch = epoch
  @poison_pill_timeout = poison_pill_timeout
  @ttl = ttl
  @strict = strict
end

Instance Attribute Details

#epochObject (readonly)

Returns the value of attribute epoch.



36
37
38
# File 'lib/ruby_reactor/ordered_lock.rb', line 36

def epoch
  @epoch
end

#keyObject (readonly)

Returns the value of attribute key.



36
37
38
# File 'lib/ruby_reactor/ordered_lock.rb', line 36

def key
  @key
end

#nonceObject (readonly)

Returns the value of attribute nonce.



36
37
38
# File 'lib/ruby_reactor/ordered_lock.rb', line 36

def nonce
  @nonce
end

#poison_pill_timeoutObject (readonly)

Returns the value of attribute poison_pill_timeout.



36
37
38
# File 'lib/ruby_reactor/ordered_lock.rb', line 36

def poison_pill_timeout
  @poison_pill_timeout
end

#strictObject (readonly)

Returns the value of attribute strict.



36
37
38
# File 'lib/ruby_reactor/ordered_lock.rb', line 36

def strict
  @strict
end

Class Method Details

.assign(key, ttl: DEFAULT_TTL) ⇒ Object

Atomic INCR on the ‘next` counter. Caller-side; runs during `Reactor.run` BEFORE `perform_async`. Returns `[nonce, epoch]` — the nonce we own plus the generation it belongs to (used to fence stale stragglers).



51
52
53
54
# File 'lib/ruby_reactor/ordered_lock.rb', line 51

def self.assign(key, ttl: DEFAULT_TTL)
  adapter = RubyReactor.configuration.storage_adapter
  adapter.ordered_lock_assign(key, ttl: ttl)
end

.peek(key) ⇒ Object

Read-only inspection. ‘{ next:, last_completed:, in_flight: […] }`.



133
134
135
# File 'lib/ruby_reactor/ordered_lock.rb', line 133

def self.peek(key)
  RubyReactor.configuration.storage_adapter.ordered_lock_peek(key)
end

.reset!(key) ⇒ Object

Nuke all counters for a key. Ops only; concurrent enqueues during reset produce undefined ordering.



144
145
146
# File 'lib/ruby_reactor/ordered_lock.rb', line 144

def self.reset!(key)
  RubyReactor.configuration.storage_adapter.ordered_lock_reset(key)
end

.skip!(key, nonce:) ⇒ Object

Manual ops escape hatch — force-advance past a stuck nonce.



138
139
140
# File 'lib/ruby_reactor/ordered_lock.rb', line 138

def self.skip!(key, nonce:)
  RubyReactor.configuration.storage_adapter.ordered_lock_skip(key, nonce: nonce)
end

Instance Method Details

#advance!(failed: false) ⇒ Object

Move ‘last_completed` forward. Idempotent: only the nonce equal to `last_completed + 1` advances; others are no-ops (the poison-pill path may have already skipped us).

Call on terminal status only (success, permanent failure, escalated skip). Retryable failures must NOT advance — the same nonce keeps owning until the job either succeeds or exhausts its retry budget.

‘failed:` records this nonce as the chain-failure marker (only the FIRST failure sticks). In strict mode the marker causes subsequent nonces to short-circuit with Skipped.

Raises:

  • (ArgumentError)


115
116
117
118
119
# File 'lib/ruby_reactor/ordered_lock.rb', line 115

def advance!(failed: false)
  raise ArgumentError, "OrderedLock#advance! requires a nonce" unless @nonce

  adapter.ordered_lock_advance(@key, nonce: @nonce, failed: failed, epoch: @epoch.to_i, ttl: @ttl)
end

#check!Object

Gate check. Returns ‘:go`, `:drained_go`, `:skip_chain_failed`, `:stale_batch`, or raises WaitError.

  • ‘:go` — proceed to run steps.

  • ‘:drained_go` — the batch fully drained and GC’d while this caller slept. A genuine late straggler should run; a Sidekiq redelivery of an already-terminal context should be skipped. The executor disambiguates via the stored context status.

  • ‘:skip_chain_failed` — only in strict mode: an earlier nonce in this sequence terminated with a Failure, so this run is short-circuited with `Skipped(reason: :ordered_lock_chain_failed)` without executing.

  • ‘:stale_batch` — this run’s epoch no longer matches the key’s current generation: its batch fully drained and the numbering was reused by a newer batch. The run is short-circuited with ‘Skipped(reason: :ordered_lock_stale_batch)` and must not participate.

  • ‘:poison_advance` is collapsed to `:go` from the caller’s perspective.

Raises:

  • (ArgumentError)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/ruby_reactor/ordered_lock.rb', line 71

def check!
  raise ArgumentError, "OrderedLock#check! requires a nonce" unless @nonce

  state, retry_after, last_completed, first_failed = adapter.ordered_lock_can_proceed(
    @key,
    nonce: @nonce,
    poison_pill_timeout: @poison_pill_timeout,
    epoch: @epoch.to_i
  )

  case state
  when "go", "poison_advance"
    chain_failed?(first_failed) ? :skip_chain_failed : :go
  when "drained_go"
    # Batch fully drained and GC'd while this caller slept. A genuine late
    # straggler may run (poison semantics); a redelivery of an
    # already-terminal context must not. The executor disambiguates via the
    # stored context status. (fail_key is GC'd here, so no chain check.)
    :drained_go
  when "stale"
    :stale_batch
  when "wait"
    raise WaitError.new(
      key: @key,
      nonce: @nonce,
      last_completed: last_completed,
      retry_after_seconds: retry_after
    )
  else
    raise "Unexpected OrderedLock state: #{state.inspect}"
  end
end

#heartbeat!Object

Restamp this nonce’s ‘assigned_at` to “now” while its steps execute, so a successor does not poison-advance past a blocker that is merely slow (not dead). Called on an interval by a background heartbeat thread for the duration of step execution. No-op if the nonce’s timer was already deleted by a terminal advance, or if the batch has gone stale (epoch fence).



126
127
128
129
130
# File 'lib/ruby_reactor/ordered_lock.rb', line 126

def heartbeat!
  return unless @nonce

  adapter.ordered_lock_heartbeat(@key, nonce: @nonce, epoch: @epoch.to_i)
end