Class: RubyReactor::OrderedLock
- Inherits:
-
Object
- Object
- RubyReactor::OrderedLock
- Defined in:
- lib/ruby_reactor/ordered_lock.rb
Overview
Strict-ordering primitive. A monotonically increasing nonce is assigned at enqueue time; the worker can proceed only when its nonce equals ‘last_completed + 1`. Otherwise the worker raises WaitError, which the Sidekiq worker rescues and re-snoozes via `perform_in`.
See ‘with_ordered_lock` for usage from a reactor.
Defined Under Namespace
Classes: WaitError
Constant Summary collapse
- DEFAULT_POISON_PILL_TIMEOUT =
Default poison-pill: if the blocker nonce was assigned more than ‘poison_pill_timeout` seconds ago and never advanced, the gate treats it as dead and advances past it. Prevents permanent head-of-line blocking from a crashed caller that INCRed but never enqueued.
600- DEFAULT_TTL =
TTL on the Redis counter keys. Bumped on every assign so an active sequence never expires; only fully-drained ones GC themselves.
86_400
Instance Attribute Summary collapse
-
#epoch ⇒ Object
readonly
Returns the value of attribute epoch.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#nonce ⇒ Object
readonly
Returns the value of attribute nonce.
-
#poison_pill_timeout ⇒ Object
readonly
Returns the value of attribute poison_pill_timeout.
-
#strict ⇒ Object
readonly
Returns the value of attribute strict.
Class Method Summary collapse
-
.assign(key, ttl: DEFAULT_TTL) ⇒ Object
Atomic INCR on the ‘next` counter.
-
.peek(key) ⇒ Object
Read-only inspection.
-
.reset!(key) ⇒ Object
Nuke all counters for a key.
-
.skip!(key, nonce:) ⇒ Object
Manual ops escape hatch — force-advance past a stuck nonce.
Instance Method Summary collapse
-
#advance!(failed: false) ⇒ Object
Move ‘last_completed` forward.
-
#check! ⇒ Object
Gate check.
-
#heartbeat! ⇒ Object
Restamp this nonce’s ‘assigned_at` to “now” while its steps execute, so a successor does not poison-advance past a blocker that is merely slow (not dead).
-
#initialize(key, nonce: nil, epoch: nil, poison_pill_timeout: DEFAULT_POISON_PILL_TIMEOUT, ttl: DEFAULT_TTL, strict: true) ⇒ OrderedLock
constructor
rubocop:disable Metrics/ParameterLists.
Constructor Details
#initialize(key, nonce: nil, epoch: nil, poison_pill_timeout: DEFAULT_POISON_PILL_TIMEOUT, ttl: DEFAULT_TTL, strict: true) ⇒ OrderedLock
rubocop:disable Metrics/ParameterLists
38 39 40 41 42 43 44 45 46 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 38 def initialize(key, nonce: nil, epoch: nil, poison_pill_timeout: DEFAULT_POISON_PILL_TIMEOUT, # rubocop:disable Metrics/ParameterLists ttl: DEFAULT_TTL, strict: true) @key = key @nonce = nonce @epoch = epoch @poison_pill_timeout = poison_pill_timeout @ttl = ttl @strict = strict end |
Instance Attribute Details
#epoch ⇒ Object (readonly)
Returns the value of attribute epoch.
36 37 38 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 36 def epoch @epoch end |
#key ⇒ Object (readonly)
Returns the value of attribute key.
36 37 38 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 36 def key @key end |
#nonce ⇒ Object (readonly)
Returns the value of attribute nonce.
36 37 38 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 36 def nonce @nonce end |
#poison_pill_timeout ⇒ Object (readonly)
Returns the value of attribute poison_pill_timeout.
36 37 38 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 36 def poison_pill_timeout @poison_pill_timeout end |
#strict ⇒ Object (readonly)
Returns the value of attribute strict.
36 37 38 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 36 def strict @strict end |
Class Method Details
.assign(key, ttl: DEFAULT_TTL) ⇒ Object
Atomic INCR on the ‘next` counter. Caller-side; runs during `Reactor.run` BEFORE `perform_async`. Returns `[nonce, epoch]` — the nonce we own plus the generation it belongs to (used to fence stale stragglers).
51 52 53 54 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 51 def self.assign(key, ttl: DEFAULT_TTL) adapter = RubyReactor.configuration.storage_adapter adapter.ordered_lock_assign(key, ttl: ttl) end |
.peek(key) ⇒ Object
Read-only inspection. ‘{ next:, last_completed:, in_flight: […] }`.
133 134 135 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 133 def self.peek(key) RubyReactor.configuration.storage_adapter.ordered_lock_peek(key) end |
.reset!(key) ⇒ Object
Nuke all counters for a key. Ops only; concurrent enqueues during reset produce undefined ordering.
144 145 146 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 144 def self.reset!(key) RubyReactor.configuration.storage_adapter.ordered_lock_reset(key) end |
.skip!(key, nonce:) ⇒ Object
Manual ops escape hatch — force-advance past a stuck nonce.
138 139 140 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 138 def self.skip!(key, nonce:) RubyReactor.configuration.storage_adapter.ordered_lock_skip(key, nonce: nonce) end |
Instance Method Details
#advance!(failed: false) ⇒ Object
Move ‘last_completed` forward. Idempotent: only the nonce equal to `last_completed + 1` advances; others are no-ops (the poison-pill path may have already skipped us).
Call on terminal status only (success, permanent failure, escalated skip). Retryable failures must NOT advance — the same nonce keeps owning until the job either succeeds or exhausts its retry budget.
‘failed:` records this nonce as the chain-failure marker (only the FIRST failure sticks). In strict mode the marker causes subsequent nonces to short-circuit with Skipped.
115 116 117 118 119 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 115 def advance!(failed: false) raise ArgumentError, "OrderedLock#advance! requires a nonce" unless @nonce adapter.ordered_lock_advance(@key, nonce: @nonce, failed: failed, epoch: @epoch.to_i, ttl: @ttl) end |
#check! ⇒ Object
Gate check. Returns ‘:go`, `:drained_go`, `:skip_chain_failed`, `:stale_batch`, or raises WaitError.
-
‘:go` — proceed to run steps.
-
‘:drained_go` — the batch fully drained and GC’d while this caller slept. A genuine late straggler should run; a Sidekiq redelivery of an already-terminal context should be skipped. The executor disambiguates via the stored context status.
-
‘:skip_chain_failed` — only in strict mode: an earlier nonce in this sequence terminated with a Failure, so this run is short-circuited with `Skipped(reason: :ordered_lock_chain_failed)` without executing.
-
‘:stale_batch` — this run’s epoch no longer matches the key’s current generation: its batch fully drained and the numbering was reused by a newer batch. The run is short-circuited with ‘Skipped(reason: :ordered_lock_stale_batch)` and must not participate.
-
‘:poison_advance` is collapsed to `:go` from the caller’s perspective.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 71 def check! raise ArgumentError, "OrderedLock#check! requires a nonce" unless @nonce state, retry_after, last_completed, first_failed = adapter.ordered_lock_can_proceed( @key, nonce: @nonce, poison_pill_timeout: @poison_pill_timeout, epoch: @epoch.to_i ) case state when "go", "poison_advance" chain_failed?(first_failed) ? :skip_chain_failed : :go when "drained_go" # Batch fully drained and GC'd while this caller slept. A genuine late # straggler may run (poison semantics); a redelivery of an # already-terminal context must not. The executor disambiguates via the # stored context status. (fail_key is GC'd here, so no chain check.) :drained_go when "stale" :stale_batch when "wait" raise WaitError.new( key: @key, nonce: @nonce, last_completed: last_completed, retry_after_seconds: retry_after ) else raise "Unexpected OrderedLock state: #{state.inspect}" end end |
#heartbeat! ⇒ Object
Restamp this nonce’s ‘assigned_at` to “now” while its steps execute, so a successor does not poison-advance past a blocker that is merely slow (not dead). Called on an interval by a background heartbeat thread for the duration of step execution. No-op if the nonce’s timer was already deleted by a terminal advance, or if the batch has gone stale (epoch fence).
126 127 128 129 130 |
# File 'lib/ruby_reactor/ordered_lock.rb', line 126 def heartbeat! return unless @nonce adapter.ordered_lock_heartbeat(@key, nonce: @nonce, epoch: @epoch.to_i) end |