Class: Wurk::Leader

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/leader.rb

Overview

Cluster leader election via Redis ‘SET NX EX`. Single-leader-per-cluster is best-effort (not Raft): a partitioned ex-leader can briefly co-exist with a new one until the TTL expires. Callers that need strict mutual exclusion must idempotency-guard their writes (see `#token`).

Wire-compat: the cluster lock lives at ‘dear-leader` (STRING, EX≈30s) holding the `<hostname>:<pid>:<process_nonce>` identity. Each fresh gain also pulls a monotonic fencing token via `INCR leader-token`, exposed on `#token`. Wurk goes a small step beyond Sidekiq Enterprise (which deliberately does not expose fencing) so downstream code that can benefit from a guard has one available; the token is best-effort too — it is never re-read on subsequent acquires, only on transitions.

Cadence per spec: renew every 15s while leader, recheck every 60s as follower, lock TTL 30s. Opt out a process from campaigning entirely with ‘WURK_LEADER=false` (or its Sidekiq alias `SIDEKIQ_LEADER=false`), useful for hot-standby pools.

Spec: docs/target/sidekiq-ent.md §6.

Constant Summary collapse

DEFAULT_KEY =
'dear-leader'
TOKEN_KEY =
'leader-token'
DEFAULT_TTL =
30
DEFAULT_RENEW_INTERVAL =
15
DEFAULT_FOLLOWER_INTERVAL =
60
OPT_OUT_ENV =

native opt-out env

'WURK_LEADER'
SIDEKIQ_OPT_OUT_ENV =

Sidekiq Ent drop-in alias (§6.2/§7.2)

'SIDEKIQ_LEADER'
THREAD_NAME =
'wurk-leader'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: nil, key: DEFAULT_KEY, ttl: DEFAULT_TTL, renew_interval: DEFAULT_RENEW_INTERVAL, follower_interval: DEFAULT_FOLLOWER_INTERVAL, pool: nil, owner: nil) ⇒ Leader

rubocop:disable Metrics/ParameterLists



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/wurk/leader.rb', line 46

def initialize(config: nil, key: DEFAULT_KEY, ttl: DEFAULT_TTL, # rubocop:disable Metrics/ParameterLists
               renew_interval: DEFAULT_RENEW_INTERVAL,
               follower_interval: DEFAULT_FOLLOWER_INTERVAL,
               pool: nil, owner: nil)
  @config = config
  @key = key
  @ttl = ttl
  @renew_interval = renew_interval
  @follower_interval = follower_interval
  @pool = pool
  @owner = owner || cluster_identity
  @held = false
  @token = nil
  @thread = nil
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



44
45
46
# File 'lib/wurk/leader.rb', line 44

def config
  @config
end

#keyObject (readonly)

Returns the value of attribute key.



44
45
46
# File 'lib/wurk/leader.rb', line 44

def key
  @key
end

#ownerObject (readonly)

Returns the value of attribute owner.



44
45
46
# File 'lib/wurk/leader.rb', line 44

def owner
  @owner
end

#tokenObject (readonly)

Returns the value of attribute token.



44
45
46
# File 'lib/wurk/leader.rb', line 44

def token
  @token
end

#ttlObject (readonly)

Returns the value of attribute ttl.



44
45
46
# File 'lib/wurk/leader.rb', line 44

def ttl
  @ttl
end

Class Method Details

.opted_out?Boolean

True when this process has opted out of campaigning via ‘WURK_LEADER=false` or its Sidekiq alias `SIDEKIQ_LEADER=false` (hot-standby pools that must never lead). Either env name works.

Returns:

  • (Boolean)


40
41
42
# File 'lib/wurk/leader.rb', line 40

def self.opted_out?
  [OPT_OUT_ENV, SIDEKIQ_OPT_OUT_ENV].any? { |k| ENV[k].to_s.downcase == 'false' }
end

Instance Method Details

#acquireObject

SET NX EX. If the key already holds our owner string (rare — same process re-entering after a hiccup), refresh via EXPIRE so leadership doesn’t lapse. On any follower → leader transition, INCR the global ‘leader-token` so the new token is strictly greater than every prior leader’s, then dispatch the ‘:leader` lifecycle event.



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/wurk/leader.rb', line 77

def acquire # rubocop:disable Naming/PredicateMethod
  return false if disabled?

  transition = run_set_or_refresh
  return false unless transition

  if transition == :gained
    @token = redis_call { |c| c.call('INCR', TOKEN_KEY) }
    dispatch_leader_event
  end
  true
end

#disabled?Boolean

‘WURK_LEADER=false` (or `SIDEKIQ_LEADER=false`) makes `acquire` a no-op and `leader?` permanently false; the renewal thread also refuses to start. Useful for hot-standby pools that must never campaign.

Returns:

  • (Boolean)


68
69
70
# File 'lib/wurk/leader.rb', line 68

def disabled?
  self.class.opted_out?
end

#leader?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/wurk/leader.rb', line 101

def leader?
  @held
end

#releaseObject

CAS DEL — only drop the key if we still own it, otherwise a stale release would yank leadership from whichever follower took over.



92
93
94
95
96
97
98
99
# File 'lib/wurk/leader.rb', line 92

def release
  redis_call do |c|
    c.call('DEL', @key) if c.call('GET', @key) == @owner
  end
  @held = false
  @token = nil
  nil
end

#running?Boolean

Returns:

  • (Boolean)


131
132
133
# File 'lib/wurk/leader.rb', line 131

def running?
  !@thread.nil? && @thread.alive?
end

#startObject

Spawns the periodic re-election thread. Idempotent. While leader, the loop re-acquires (refreshing TTL) every ‘renew_interval`; while follower, it polls every `follower_interval`. Caller must invoke `stop` for orderly shutdown — the thread also releases its lock on exit.



110
111
112
113
114
115
116
117
118
119
# File 'lib/wurk/leader.rb', line 110

def start
  return nil if disabled?

  @mutex.synchronize do
    return @thread if @thread

    @done = false
  end
  @thread = spawn_loop_thread
end

#stopObject



121
122
123
124
125
126
127
128
129
# File 'lib/wurk/leader.rb', line 121

def stop
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
  @thread&.join
  @thread = nil
  release
end