Class: RubyReactor::SidekiqWorkers::SweeperWorker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb

Overview

Self-rescheduling recovery tick. Each run sweeps both the top-level reactor sweeper and the map sweeper, then schedules the next tick — a perpetual chain the host kicks once via ‘RubyReactor.start_sweeper!`.

super_fetch safety. Sidekiq Enterprise ‘super_fetch` reliably re-runs a job whose worker died mid-execution. For a self-rescheduling chain that is a hazard: a tick can crash AFTER enqueuing its successor but BEFORE acking, so super_fetch recovers the crashed tick alongside the successor it already scheduled — the chain forks and then doubles every interval. We therefore do NOT rely on “exactly one job exists”. The next tick is claimed by a per-time-window lock: every duplicate computes the SAME target window and only one wins the claim, so recovered/duplicated ticks collapse back to a single chain. The claim lock is never released — it simply expires — so no delete can race two duplicates into both winning.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.schedule_nextObject

Enqueue the next tick for the upcoming time window, claiming that window so concurrent/duplicate/recovered ticks produce exactly one successor. Idempotent: also safe to call from ‘start_sweeper!` on every process boot.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb', line 52

def self.schedule_next
  interval = RubyReactor.configuration.sweeper_interval
  window = (Time.now.to_i / interval) + 1

  lock = RubyReactor::Lock.new(
    "sweeper:window:#{window}",
    owner: SecureRandom.uuid,
    ttl: interval * 2, # outlive the window; expires on its own (never released)
    wait: 0,
    auto_extend: false
  )
  lock.acquire # raises AcquisitionError if this window is already claimed

  delay = (window * interval) - Time.now.to_i
  perform_in([delay, 1].max)
rescue RubyReactor::Lock::AcquisitionError
  # Another tick already scheduled this window — collapse the duplicate.
  nil
end

Instance Method Details

#performObject



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb', line 30

def perform
  config = RubyReactor.configuration
  return unless config.sweeper_enabled

  run_sweeps(config)
ensure
  # Always chain forward (unless disabled), even after an error above, so a
  # single bad sweep can't kill recovery. The window lock keeps this from
  # forking under super_fetch.
  self.class.schedule_next if RubyReactor.configuration.sweeper_enabled
end

#run_sweeps(config) ⇒ Object



42
43
44
45
46
47
# File 'lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb', line 42

def run_sweeps(config)
  RubyReactor::Sweeper.run_once(limit: config.sweeper_limit)
  RubyReactor::Map::Sweeper.run_once(limit: config.sweeper_limit)
rescue StandardError => e
  config.logger.error("RubyReactor::SweeperWorker sweep failed: #{e.class}: #{e.message}")
end