Class: RubyReactor::Map::Sweeper

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_reactor/map/sweeper.rb

Overview

Recovers map fan-out from a hard kill (Phase 5d). Maps are the path most exposed to a lost job: one missing element result hangs the whole map and its parent forever. The unifying signal is the results hash — index-keyed and idempotent (HSET) — so completion is authoritative on ‘missing`, not on the fragile counter:

missing = (0...count) - HKEYS(results)

For each active map:

* missing indices with NO live element lock are re-dispatched (M1/M4/M5).
* if nothing is missing but the parent never resumed, the collector is
  re-triggered (M2) — gated so it never fires while a collector or the
  parent is alive, or after the parent already collected.

‘run_once` is pure and idempotent; the host wires the cadence (same contract as RubyReactor::Sweeper).

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(storage: nil, async_router: nil, logger: nil) ⇒ Sweeper

Returns a new instance of Sweeper.



26
27
28
29
30
# File 'lib/ruby_reactor/map/sweeper.rb', line 26

def initialize(storage: nil, async_router: nil, logger: nil)
  @storage = storage || RubyReactor.configuration.storage_adapter
  @async_router = async_router || RubyReactor.configuration.async_router
  @logger = logger || RubyReactor.configuration.logger
end

Class Method Details

.run_once(limit: 1000) ⇒ Object



22
23
24
# File 'lib/ruby_reactor/map/sweeper.rb', line 22

def self.run_once(limit: 1000)
  new.run_once(limit: limit)
end

Instance Method Details

#run_once(limit: 1000) ⇒ Object

Returns { redispatched:, recollected: } counts.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/ruby_reactor/map/sweeper.rb', line 33

def run_once(limit: 1000)
  redispatched = 0
  recollected = 0

  @storage.scan_maps(count: limit).each do |meta|
    missing = missing_indices(meta)
    if missing.any?
      redispatched += redispatch_missing(meta, missing)
    elsif recollect?(meta)
      retrigger_collector(meta)
      recollected += 1
    end
  rescue StandardError => e
    @logger.warn("RubyReactor::Map::Sweeper failed on map #{meta["map_id"]}: #{e.class}: #{e.message}")
  end

  { redispatched: redispatched, recollected: recollected }
end