Class: RubyReactor::Map::Sweeper
- Inherits:
-
Object
- Object
- RubyReactor::Map::Sweeper
- Defined in:
- lib/ruby_reactor/map/sweeper.rb
Overview
Recovers map fan-out from a hard kill (Phase 5d). Maps are the path most exposed to a lost job: one missing element result hangs the whole map and its parent forever. The unifying signal is the results hash — index-keyed and idempotent (HSET) — so completion is authoritative on ‘missing`, not on the fragile counter:
missing = (0...count) - HKEYS(results)
For each active map:
* missing indices with NO live element lock are re-dispatched (M1/M4/M5).
* if nothing is missing but the parent never resumed, the collector is
re-triggered (M2) — gated so it never fires while a collector or the
parent is alive, or after the parent already collected.
‘run_once` is pure and idempotent; the host wires the cadence (same contract as RubyReactor::Sweeper).
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(storage: nil, async_router: nil, logger: nil) ⇒ Sweeper
constructor
A new instance of Sweeper.
-
#run_once(limit: 1000) ⇒ Object
Returns { redispatched:, recollected: } counts.
Constructor Details
#initialize(storage: nil, async_router: nil, logger: nil) ⇒ Sweeper
Returns a new instance of Sweeper.
26 27 28 29 30 |
# File 'lib/ruby_reactor/map/sweeper.rb', line 26 def initialize(storage: nil, async_router: nil, logger: nil) @storage = storage || RubyReactor.configuration.storage_adapter @async_router = async_router || RubyReactor.configuration.async_router @logger = logger || RubyReactor.configuration.logger end |
Class Method Details
.run_once(limit: 1000) ⇒ Object
22 23 24 |
# File 'lib/ruby_reactor/map/sweeper.rb', line 22 def self.run_once(limit: 1000) new.run_once(limit: limit) end |
Instance Method Details
#run_once(limit: 1000) ⇒ Object
Returns { redispatched:, recollected: } counts.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/ruby_reactor/map/sweeper.rb', line 33 def run_once(limit: 1000) redispatched = 0 recollected = 0 @storage.scan_maps(count: limit).each do || missing = missing_indices() if missing.any? redispatched += redispatch_missing(, missing) elsif recollect?() retrigger_collector() recollected += 1 end rescue StandardError => e @logger.warn("RubyReactor::Map::Sweeper failed on map #{["map_id"]}: #{e.class}: #{e.}") end { redispatched: redispatched, recollected: recollected } end |