Class: RubyReactor::Sweeper
- Inherits:
-
Object
- Object
- RubyReactor::Sweeper
- Defined in:
- lib/ruby_reactor/sweeper.rb
Overview
Re-enqueues non-terminal top-level reactor contexts whose worker died.
The per-context liveness lock (‘async:<id>`, Phase 1) is the signal: a live worker holds and auto-extends it, so its ABSENCE on a context still marked `running` means the worker crashed without finishing. The sweeper re-enqueues such contexts by id (identity-only payload, Phase 2).
‘run_once` is pure and idempotent — call it periodically; the cadence is the host’s to wire (sidekiq-cron, sidekiq-scheduler, a self-rescheduling worker, or external cron). The interval bounds recovery latency. No scheduling dependency is added to the gem.
Safety depends on Phase 1: if a context is mis-judged dead (GC pause, liveness race) and re-enqueued while its worker is actually alive, the duplicate hits the live lock -> ContextLockContention -> uncapped snooze -> no double run.
Map fan-out (element/collector jobs) is NOT covered here — those contexts carry parent_context_id and scan_reactors filters them out (F6). The map sweeper (Phase 5) owns them.
Constant Summary collapse
- DEFAULT_LIMIT =
Default upper bound on contexts inspected per sweep. scan_reactors caps its result at this count; a host with more in-flight reactors than this should raise it (or sweep more frequently).
1000
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(storage: nil, async_router: nil, logger: nil) ⇒ Sweeper
constructor
A new instance of Sweeper.
-
#run_once(limit: DEFAULT_LIMIT) ⇒ Object
Scans stored top-level reactors and re-enqueues the running-but-unlocked ones.
Constructor Details
#initialize(storage: nil, async_router: nil, logger: nil) ⇒ Sweeper
Returns a new instance of Sweeper.
33 34 35 36 37 |
# File 'lib/ruby_reactor/sweeper.rb', line 33 def initialize(storage: nil, async_router: nil, logger: nil) @storage = storage || RubyReactor.configuration.storage_adapter @async_router = async_router || RubyReactor.configuration.async_router @logger = logger || RubyReactor.configuration.logger end |
Class Method Details
.run_once(limit: DEFAULT_LIMIT) ⇒ Object
29 30 31 |
# File 'lib/ruby_reactor/sweeper.rb', line 29 def self.run_once(limit: DEFAULT_LIMIT) new.run_once(limit: limit) end |
Instance Method Details
#run_once(limit: DEFAULT_LIMIT) ⇒ Object
Scans stored top-level reactors and re-enqueues the running-but-unlocked ones. Returns the number of contexts re-enqueued.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/ruby_reactor/sweeper.rb', line 41 def run_once(limit: DEFAULT_LIMIT) reenqueued = 0 @storage.scan_reactors(count: limit).each do |reactor| next unless reactor[:status] == "running" # non-terminal only next if @storage.lock_held?("async:#{reactor[:id]}") # worker alive -> leave alone @async_router.perform_async(reactor[:id], reactor[:class]) reenqueued += 1 rescue StandardError => e # One bad record must not abort the whole sweep. @logger.warn("RubyReactor::Sweeper failed to re-enqueue #{reactor[:id]}: #{e.class}: #{e.}") end reenqueued end |