Class: RubyReactor::SidekiqWorkers::SweeperWorker
- Inherits:
-
Object
- Object
- RubyReactor::SidekiqWorkers::SweeperWorker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb
Overview
Self-rescheduling recovery tick. Each run sweeps both the top-level reactor sweeper and the map sweeper, then schedules the next tick — a perpetual chain the host kicks once via ‘RubyReactor.start_sweeper!`.
super_fetch safety. Sidekiq Enterprise ‘super_fetch` reliably re-runs a job whose worker died mid-execution. For a self-rescheduling chain that is a hazard: a tick can crash AFTER enqueuing its successor but BEFORE acking, so super_fetch recovers the crashed tick alongside the successor it already scheduled — the chain forks and then doubles every interval. We therefore do NOT rely on “exactly one job exists”. The next tick is claimed by a per-time-window lock: every duplicate computes the SAME target window and only one wins the claim, so recovered/duplicated ticks collapse back to a single chain. The claim lock is never released — it simply expires — so no delete can race two duplicates into both winning.
Class Method Summary collapse
-
.schedule_next ⇒ Object
Enqueue the next tick for the upcoming time window, claiming that window so concurrent/duplicate/recovered ticks produce exactly one successor.
Instance Method Summary collapse
Class Method Details
.schedule_next ⇒ Object
Enqueue the next tick for the upcoming time window, claiming that window so concurrent/duplicate/recovered ticks produce exactly one successor. Idempotent: also safe to call from ‘start_sweeper!` on every process boot.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb', line 52 def self.schedule_next interval = RubyReactor.configuration.sweeper_interval window = (Time.now.to_i / interval) + 1 lock = RubyReactor::Lock.new( "sweeper:window:#{window}", owner: SecureRandom.uuid, ttl: interval * 2, # outlive the window; expires on its own (never released) wait: 0, auto_extend: false ) lock.acquire # raises AcquisitionError if this window is already claimed delay = (window * interval) - Time.now.to_i perform_in([delay, 1].max) rescue RubyReactor::Lock::AcquisitionError # Another tick already scheduled this window — collapse the duplicate. nil end |
Instance Method Details
#perform ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb', line 30 def perform config = RubyReactor.configuration return unless config.sweeper_enabled run_sweeps(config) ensure # Always chain forward (unless disabled), even after an error above, so a # single bad sweep can't kill recovery. The window lock keeps this from # forking under super_fetch. self.class.schedule_next if RubyReactor.configuration.sweeper_enabled end |
#run_sweeps(config) ⇒ Object
42 43 44 45 46 47 |
# File 'lib/ruby_reactor/sidekiq_workers/sweeper_worker.rb', line 42 def run_sweeps(config) RubyReactor::Sweeper.run_once(limit: config.sweeper_limit) RubyReactor::Map::Sweeper.run_once(limit: config.sweeper_limit) rescue StandardError => e config.logger.error("RubyReactor::SweeperWorker sweep failed: #{e.class}: #{e.}") end |