Module: Familia::Horreum::RepairMethods
- Included in:
- ManagementMethods
- Defined in:
- lib/familia/horreum/management/repair.rb
Overview
RepairMethods provides repair and rebuild operations for Horreum models.
Included in ManagementMethods so every Horreum subclass gets these as class methods (e.g. Customer.repair_instances!, Customer.rebuild_instances).
Instance Method Summary collapse
-
#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer
Full SCAN-based rebuild of the instances timeline with atomic swap.
-
#repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false, verify: false) {|Hash| ... } ⇒ Hash
Runs health_check then all repair methods.
-
#repair_indexes!(audit_results = nil) ⇒ Hash
Repairs indexes by running existing rebuild methods for stale indexes.
-
#repair_instances!(audit_result = nil) ⇒ Hash
Repairs the instances timeline by removing phantoms and adding missing entries.
-
#repair_multi_indexes!(audit_results = nil) ⇒ Hash
Repairs multi-indexes by invoking the existing rebuild_
methods. -
#repair_participations!(audit_results = nil) ⇒ Hash
Repairs participation collections by removing stale members.
-
#repair_related_fields!(audit_results = nil) {|Hash| ... } ⇒ Hash
Removes orphaned collection keys detected by audit_related_fields.
-
#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator
SCAN helper for enumerating keys matching a pattern.
Instance Method Details
#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer
Full SCAN-based rebuild of the instances timeline with atomic swap.
Scans all hash keys matching this class's pattern, extracts identifiers, and rebuilds the sorted set with timestamps from the objects.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/familia/horreum/management/repair.rb', line 67 def rebuild_instances(batch_size: 100, &progress) pattern = scan_pattern final_key = instances.dbkey temp_key = Familia::AtomicOperations.build_temp_key(final_key) count = 0 cursor = "0" batch = [] loop do cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size) keys.each do |key| identifier = extract_identifier_from_key(key) next if identifier.nil? || identifier.empty? batch << { key: key, identifier: identifier } end # Process batch when it reaches threshold if batch.size >= batch_size count += process_rebuild_batch(batch, temp_key) progress&.call(phase: :rebuilding, current: count, total: nil) batch.clear end break if cursor == "0" end # Process remaining batch unless batch.empty? count += process_rebuild_batch(batch, temp_key) progress&.call(phase: :rebuilding, current: count, total: nil) end # Atomic swap Familia::AtomicOperations.atomic_swap(temp_key, final_key, dbclient) progress&.call(phase: :completed, current: count, total: count) count end |
#repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false, verify: false) {|Hash| ... } ⇒ Hash
Runs health_check then all repair methods.
By default this repairs four always-on dimensions (instances, unique indexes, multi indexes, and participations). The related_fields dimension is opt-in via +audit_collections:+; cross_references drift is reported but not auto-repaired.
Each repair stage runs inside its own rescue boundary so a failure in one dimension does not prevent the others from running. The returned hash includes:
- +:report+ -- the AuditReport produced by health_check
- one entry per stage that ran (e.g. +:instances+, +:indexes+, +:multi_indexes+, +:participations+, optionally +:related_fields+)
- +:errors+ -- map of stage name to exception details for any stage that raised
- +:status+ -- +:ok+ when every stage finished cleanly, +:partial_failure+ when one or more stages errored
When +verify:+ is true, a second health_check runs after repair and the resulting report is exposed as +:post_audit+, with +:verified+ set to +post_audit.healthy?+. This lets callers confirm that the repair actually drove the model back to a healthy state.
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/familia/horreum/management/repair.rb', line 264 def repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false, verify: false, &progress) report = health_check( batch_size: batch_size, audit_collections: audit_collections, check_cross_refs: check_cross_refs, &progress ) stages = {} errors = {} # Run each repair stage with exception isolation. A failure in # one stage should not prevent later stages from running, so the # operator gets the broadest possible view of what was repaired. run_repair_stage(stages, errors, :instances) { repair_instances!(report.instances) } run_repair_stage(stages, errors, :indexes) { repair_indexes!(report.unique_indexes) } run_repair_stage(stages, errors, :multi_indexes) { repair_multi_indexes!(report.multi_indexes) } run_repair_stage(stages, errors, :participations) { repair_participations!(report.participations) } # Only repair related fields when the audit actually checked them. # Nil means the caller did not pass audit_collections: true to # health_check, so repair has nothing to act on. unless report..nil? run_repair_stage(stages, errors, :related_fields) { (report., &progress) } end result = stages.merge( report: report, errors: errors, status: errors.empty? ? :ok : :partial_failure, ) if verify post = health_check( batch_size: batch_size, audit_collections: audit_collections, check_cross_refs: check_cross_refs, &progress ) result[:post_audit] = post result[:verified] = post.healthy? end result end |
#repair_indexes!(audit_results = nil) ⇒ Hash
Repairs indexes by running existing rebuild methods for stale indexes.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/familia/horreum/management/repair.rb', line 114 def repair_indexes!(audit_results = nil) audit_results ||= audit_unique_indexes rebuilt = [] audit_results.each do |idx_result| index_name = idx_result[:index_name] next if idx_result[:stale].empty? && idx_result[:missing].empty? rebuild_method = :"rebuild_#{index_name}" if respond_to?(rebuild_method) send(rebuild_method) rebuilt << index_name end end { rebuilt: rebuilt } end |
#repair_instances!(audit_result = nil) ⇒ Hash
Repairs the instances timeline by removing phantoms and adding missing entries.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/familia/horreum/management/repair.rb', line 18 def repair_instances!(audit_result = nil) audit_result ||= audit_instances phantoms_removed = 0 missing_added = 0 # Remove phantoms (in timeline but key expired/deleted). # Batch all ZREMs in a single pipeline to avoid N round-trips. phantoms = audit_result[:phantoms] unless phantoms.empty? instances_key = instances.dbkey pipelined do |pipe| phantoms.each do |identifier| pipe.zrem(instances_key, identifier) end end phantoms_removed = phantoms.size end # Add missing (key exists but not in timeline). # Batch-load all objects via load_multi, then batch ZADDs in a pipeline. missing = audit_result[:missing] unless missing.empty? objects = load_multi(missing) instances_key = instances.dbkey pipelined do |pipe| missing.each_with_index do |identifier, idx| obj = objects[idx] next unless obj # Key expired between SCAN and load score = (obj) pipe.zadd(instances_key, score, identifier) missing_added += 1 end end end { phantoms_removed: phantoms_removed, missing_added: missing_added } end |
#repair_multi_indexes!(audit_results = nil) ⇒ Hash
Repairs multi-indexes by invoking the existing rebuild_
For class-level multi-indexes the rebuild method lives on the indexed class (e.g. +Customer.rebuild_role_index+). For instance-scoped multi-indexes the rebuild method lives on each scope instance (e.g. +company.rebuild_dept_index+); we iterate the scope class's instances timeline and rebuild per scope.
Indexes whose audit status is :ok are skipped. Indexes for which a rebuild method is unavailable or a scope class has no instances collection are recorded in +:skipped+ with a reason.
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/familia/horreum/management/repair.rb', line 329 def repair_multi_indexes!(audit_results = nil) audit_results ||= audit_multi_indexes rebuilt = [] rebuilt_per_scope = [] skipped = [] unless respond_to?(:indexing_relationships) return { rebuilt: rebuilt, rebuilt_per_scope: rebuilt_per_scope, skipped: skipped } end multi_rels = indexing_relationships.select { |r| r.cardinality == :multi } audit_results.each do |idx_result| next if idx_result[:status] == :ok index_name = idx_result[:index_name] rel = multi_rels.find { |r| r.index_name == index_name } unless rel skipped << { index_name: index_name, reason: :relationship_missing } next end if rel.class_level? method = :"rebuild_#{index_name}" if respond_to?(method) send(method) rebuilt << index_name else skipped << { index_name: index_name, reason: :rebuild_method_missing } end else scopes_rebuilt = rebuild_instance_scoped_multi_index(rel, skipped) rebuilt_per_scope << { index_name: index_name, scopes_rebuilt: scopes_rebuilt } if scopes_rebuilt end end { rebuilt: rebuilt, rebuilt_per_scope: rebuilt_per_scope, skipped: skipped } end |
#repair_participations!(audit_results = nil) ⇒ Hash
Repairs participation collections by removing stale members.
Removes identifiers from the actual participation collections (not the instances timeline). Each stale entry from the audit carries a collection_key identifying the exact Redis key to remove from, plus the raw identifier string to remove.
Uses raw Redis commands (ZREM/SREM/LREM) because the stored member values are raw identifier strings (not JSON-encoded), and the DataType#remove method would JSON-encode string args.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/familia/horreum/management/repair.rb', line 147 def repair_participations!(audit_results = nil) audit_results ||= audit_participations # Build a lookup from collection_name to the target class's dbclient. # The audit reads collections using target_class.dbclient, so repairs # must use the same client (critical in multi-database setups). client_by_name = {} if respond_to?(:participation_relationships) participation_relationships.each do |rel| client_by_name[rel.collection_name.to_s] = rel.target_class.dbclient end end # Collect all valid stale entries and group by collection_key # so we can batch Redis operations instead of 2N round-trips. grouped = Hash.new { |h, k| h[k] = [] } key_clients = {} audit_results.each do |part_result| part_result[:stale_members].each do |entry| identifier = entry[:identifier] collection_key = entry[:collection_key] collection_name = entry[:collection_name] next unless collection_key && identifier grouped[collection_key] << identifier key_clients[collection_key] ||= client_by_name[collection_name.to_s] end end return { stale_removed: 0 } if grouped.empty? stale_removed = batch_remove_stale_members(grouped, key_clients) { stale_removed: stale_removed } end |
#repair_related_fields!(audit_results = nil) {|Hash| ... } ⇒ Hash
Removes orphaned collection keys detected by audit_related_fields.
Each entry's orphaned_keys list contains full Redis keys whose parent Horreum hash no longer exists. This method DELs each of those keys, tracking successes and failures per key. A Redis::CommandError on any single DEL is captured in failed_keys so the rest of the batch can still be processed.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/familia/horreum/management/repair.rb', line 195 def (audit_results = nil, &progress) audit_results ||= orphaned_keys = audit_results.flat_map { |entry| Array(entry[:orphaned_keys]) } total = orphaned_keys.size removed_keys = [] failed_keys = [] orphaned_keys.each_with_index do |key, idx| begin dbclient.del(key) removed_keys << key rescue Redis::CommandError => e failed_keys << { key: key, error: e. } end progress&.call(phase: :repair_related_fields, current: idx + 1, total: total) end status = removed_keys.empty? && failed_keys.empty? ? :ok : :issues_found { removed_keys: removed_keys, failed_keys: failed_keys, status: status, } end |
#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator
SCAN helper for enumerating keys matching a pattern.
376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/familia/horreum/management/repair.rb', line 376 def scan_keys(filter = '*', batch_size: 100, &block) pattern = dbkey(filter) return enum_for(:scan_keys, filter, batch_size: batch_size) unless block_given? cursor = "0" loop do cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size) keys.each(&block) break if cursor == "0" end end |