Module: Familia::Horreum::RepairMethods
- Included in:
- ManagementMethods
- Defined in:
- lib/familia/horreum/management/repair.rb
Overview
RepairMethods provides repair and rebuild operations for Horreum models.
Included in ManagementMethods so every Horreum subclass gets these as class methods (e.g. Customer.repair_instances!, Customer.rebuild_instances).
Instance Method Summary collapse
-
#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer
Full SCAN-based rebuild of the instances timeline with atomic swap.
-
#repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false) {|Hash| ... } ⇒ Hash
Runs health_check then all repair methods.
-
#repair_indexes!(audit_results = nil) ⇒ Hash
Repairs indexes by running existing rebuild methods for stale indexes.
-
#repair_instances!(audit_result = nil) ⇒ Hash
Repairs the instances timeline by removing phantoms and adding missing entries.
-
#repair_participations!(audit_results = nil) ⇒ Hash
Repairs participation collections by removing stale members.
-
#repair_related_fields!(audit_results = nil) {|Hash| ... } ⇒ Hash
Removes orphaned collection keys detected by audit_related_fields.
-
#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator
SCAN helper for enumerating keys matching a pattern.
Instance Method Details
#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer
Full SCAN-based rebuild of the instances timeline with atomic swap.
Scans all hash keys matching this class's pattern, extracts identifiers, and rebuilds the sorted set with timestamps from the objects.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/familia/horreum/management/repair.rb', line 67 def rebuild_instances(batch_size: 100, &progress) pattern = scan_pattern final_key = instances.dbkey temp_key = Familia::AtomicOperations.build_temp_key(final_key) count = 0 cursor = "0" batch = [] loop do cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size) keys.each do |key| identifier = extract_identifier_from_key(key) next if identifier.nil? || identifier.empty? batch << { key: key, identifier: identifier } end # Process batch when it reaches threshold if batch.size >= batch_size count += process_rebuild_batch(batch, temp_key) progress&.call(phase: :rebuilding, current: count, total: nil) batch.clear end break if cursor == "0" end # Process remaining batch unless batch.empty? count += process_rebuild_batch(batch, temp_key) progress&.call(phase: :rebuilding, current: count, total: nil) end # Atomic swap Familia::AtomicOperations.atomic_swap(temp_key, final_key, dbclient) progress&.call(phase: :completed, current: count, total: count) count end |
#repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false) {|Hash| ... } ⇒ Hash
Runs health_check then all repair methods.
By default this repairs the three always-on dimensions (instances, unique indexes, participations). The related_fields and cross_references dimensions are opt-in and must be enabled on the audit side for repair to consider them.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/familia/horreum/management/repair.rb', line 245 def repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false, &progress) report = health_check( batch_size: batch_size, audit_collections: audit_collections, check_cross_refs: check_cross_refs, &progress ) instances_result = repair_instances!(report.instances) indexes_result = repair_indexes!(report.unique_indexes) participations_result = repair_participations!(report.participations) result = { report: report, instances: instances_result, indexes: indexes_result, participations: participations_result, } # Only repair related fields when the audit actually checked them. # Nil means the caller did not pass audit_collections: true to # health_check, so repair has nothing to act on. unless report..nil? result[:related_fields] = (report., &progress) end result end |
#repair_indexes!(audit_results = nil) ⇒ Hash
Repairs indexes by running existing rebuild methods for stale indexes.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/familia/horreum/management/repair.rb', line 114 def repair_indexes!(audit_results = nil) audit_results ||= audit_unique_indexes rebuilt = [] audit_results.each do |idx_result| index_name = idx_result[:index_name] next if idx_result[:stale].empty? && idx_result[:missing].empty? rebuild_method = :"rebuild_#{index_name}" if respond_to?(rebuild_method) send(rebuild_method) rebuilt << index_name end end { rebuilt: rebuilt } end |
#repair_instances!(audit_result = nil) ⇒ Hash
Repairs the instances timeline by removing phantoms and adding missing entries.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/familia/horreum/management/repair.rb', line 18 def repair_instances!(audit_result = nil) audit_result ||= audit_instances phantoms_removed = 0 missing_added = 0 # Remove phantoms (in timeline but key expired/deleted). # Batch all ZREMs in a single pipeline to avoid N round-trips. phantoms = audit_result[:phantoms] unless phantoms.empty? instances_key = instances.dbkey pipelined do |pipe| phantoms.each do |identifier| pipe.zrem(instances_key, identifier) end end phantoms_removed = phantoms.size end # Add missing (key exists but not in timeline). # Batch-load all objects via load_multi, then batch ZADDs in a pipeline. missing = audit_result[:missing] unless missing.empty? objects = load_multi(missing) instances_key = instances.dbkey pipelined do |pipe| missing.each_with_index do |identifier, idx| obj = objects[idx] next unless obj # Key expired between SCAN and load score = (obj) pipe.zadd(instances_key, score, identifier) missing_added += 1 end end end { phantoms_removed: phantoms_removed, missing_added: missing_added } end |
#repair_participations!(audit_results = nil) ⇒ Hash
Repairs participation collections by removing stale members.
Removes identifiers from the actual participation collections (not the instances timeline). Each stale entry from the audit carries a collection_key identifying the exact Redis key to remove from, plus the raw identifier string to remove.
Uses raw Redis commands (ZREM/SREM/LREM) because the stored member values are raw identifier strings (not JSON-encoded), and the DataType#remove method would JSON-encode string args.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/familia/horreum/management/repair.rb', line 147 def repair_participations!(audit_results = nil) audit_results ||= audit_participations # Build a lookup from collection_name to the target class's dbclient. # The audit reads collections using target_class.dbclient, so repairs # must use the same client (critical in multi-database setups). client_by_name = {} if respond_to?(:participation_relationships) participation_relationships.each do |rel| client_by_name[rel.collection_name.to_s] = rel.target_class.dbclient end end # Collect all valid stale entries and group by collection_key # so we can batch Redis operations instead of 2N round-trips. grouped = Hash.new { |h, k| h[k] = [] } key_clients = {} audit_results.each do |part_result| part_result[:stale_members].each do |entry| identifier = entry[:identifier] collection_key = entry[:collection_key] collection_name = entry[:collection_name] next unless collection_key && identifier grouped[collection_key] << identifier key_clients[collection_key] ||= client_by_name[collection_name.to_s] end end return { stale_removed: 0 } if grouped.empty? stale_removed = batch_remove_stale_members(grouped, key_clients) { stale_removed: stale_removed } end |
#repair_related_fields!(audit_results = nil) {|Hash| ... } ⇒ Hash
Removes orphaned collection keys detected by audit_related_fields.
Each entry's orphaned_keys list contains full Redis keys whose parent Horreum hash no longer exists. This method DELs each of those keys, tracking successes and failures per key. A Redis::CommandError on any single DEL is captured in failed_keys so the rest of the batch can still be processed.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/familia/horreum/management/repair.rb', line 195 def (audit_results = nil, &progress) audit_results ||= orphaned_keys = audit_results.flat_map { |entry| Array(entry[:orphaned_keys]) } total = orphaned_keys.size removed_keys = [] failed_keys = [] orphaned_keys.each_with_index do |key, idx| begin dbclient.del(key) removed_keys << key rescue Redis::CommandError => e failed_keys << { key: key, error: e. } end progress&.call(phase: :repair_related_fields, current: idx + 1, total: total) end status = removed_keys.empty? && failed_keys.empty? ? :ok : :issues_found { removed_keys: removed_keys, failed_keys: failed_keys, status: status, } end |
#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator
SCAN helper for enumerating keys matching a pattern.
281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/familia/horreum/management/repair.rb', line 281 def scan_keys(filter = '*', batch_size: 100, &block) pattern = dbkey(filter) return enum_for(:scan_keys, filter, batch_size: batch_size) unless block_given? cursor = "0" loop do cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size) keys.each(&block) break if cursor == "0" end end |