Module: Familia::Horreum::RepairMethods

Included in:
ManagementMethods
Defined in:
lib/familia/horreum/management/repair.rb

Overview

RepairMethods provides repair and rebuild operations for Horreum models.

Included in ManagementMethods so every Horreum subclass gets these as class methods (e.g. Customer.repair_instances!, Customer.rebuild_instances).

Instance Method Summary collapse

Instance Method Details

#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer

Full SCAN-based rebuild of the instances timeline with atomic swap.

Scans all hash keys matching this class's pattern, extracts identifiers, and rebuilds the sorted set with timestamps from the objects.

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN cursor count hint (default: 100)

Yields:

  • (Hash)

    Progress: current:, total:

Returns:

  • (Integer)

    Number of instances rebuilt



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/familia/horreum/management/repair.rb', line 67

def rebuild_instances(batch_size: 100, &progress)
  pattern = scan_pattern
  final_key = instances.dbkey
  temp_key = Familia::AtomicOperations.build_temp_key(final_key)

  count = 0
  cursor = "0"
  batch = []

  loop do
    cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size)

    keys.each do |key|
      identifier = extract_identifier_from_key(key)
      next if identifier.nil? || identifier.empty?

      batch << { key: key, identifier: identifier }
    end

    # Process batch when it reaches threshold
    if batch.size >= batch_size
      count += process_rebuild_batch(batch, temp_key)
      progress&.call(phase: :rebuilding, current: count, total: nil)
      batch.clear
    end

    break if cursor == "0"
  end

  # Process remaining batch
  unless batch.empty?
    count += process_rebuild_batch(batch, temp_key)
    progress&.call(phase: :rebuilding, current: count, total: nil)
  end

  # Atomic swap
  Familia::AtomicOperations.atomic_swap(temp_key, final_key, dbclient)

  progress&.call(phase: :completed, current: count, total: count)
  count
end

#repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false, verify: false) {|Hash| ... } ⇒ Hash

Runs health_check then all repair methods.

By default this repairs four always-on dimensions (instances, unique indexes, multi indexes, and participations). The related_fields dimension is opt-in via +audit_collections:+; cross_references drift is reported but not auto-repaired.

Each repair stage runs inside its own rescue boundary so a failure in one dimension does not prevent the others from running. The returned hash includes:

  • +:report+ -- the AuditReport produced by health_check
  • one entry per stage that ran (e.g. +:instances+, +:indexes+, +:multi_indexes+, +:participations+, optionally +:related_fields+)
  • +:errors+ -- map of stage name to exception details for any stage that raised
  • +:status+ -- +:ok+ when every stage finished cleanly, +:partial_failure+ when one or more stages errored

When +verify:+ is true, a second health_check runs after repair and the resulting report is exposed as +:post_audit+, with +:verified+ set to +post_audit.healthy?+. This lets callers confirm that the repair actually drove the model back to a healthy state.

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN batch size passed to health_check

  • audit_collections (Boolean) (defaults to: false)

    When true, health_check runs audit_related_fields and repair_all! calls repair_related_fields! with the result. When false (default), related_fields stays nil in the report and is not repaired.

  • check_cross_refs (Boolean) (defaults to: false)

    When true, health_check runs audit_cross_references and the result is included in the returned report for inspection. NOTE: no automatic repair is performed for cross_reference drift; callers must inspect the audit report and resolve the drift manually. The flag is accepted here for symmetry with the audit side and so repair_all! can surface the dimension through its returned report.

  • verify (Boolean) (defaults to: false)

    When true, re-run health_check after repair and expose the result as +:post_audit+ / +:verified+.

Yields:

  • (Hash)

    Progress callbacks

Returns:

  • (Hash)

    Combined repair results plus the AuditReport



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/familia/horreum/management/repair.rb', line 264

def repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false,
                verify: false, &progress)
  report = health_check(
    batch_size: batch_size,
    audit_collections: audit_collections,
    check_cross_refs: check_cross_refs,
    &progress
  )

  stages = {}
  errors = {}

  # Run each repair stage with exception isolation. A failure in
  # one stage should not prevent later stages from running, so the
  # operator gets the broadest possible view of what was repaired.
  run_repair_stage(stages, errors, :instances) { repair_instances!(report.instances) }
  run_repair_stage(stages, errors, :indexes) { repair_indexes!(report.unique_indexes) }
  run_repair_stage(stages, errors, :multi_indexes) { repair_multi_indexes!(report.multi_indexes) }
  run_repair_stage(stages, errors, :participations) { repair_participations!(report.participations) }

  # Only repair related fields when the audit actually checked them.
  # Nil means the caller did not pass audit_collections: true to
  # health_check, so repair has nothing to act on.
  unless report.related_fields.nil?
    run_repair_stage(stages, errors, :related_fields) {
      repair_related_fields!(report.related_fields, &progress)
    }
  end

  result = stages.merge(
    report: report,
    errors: errors,
    status: errors.empty? ? :ok : :partial_failure,
  )

  if verify
    post = health_check(
      batch_size: batch_size,
      audit_collections: audit_collections,
      check_cross_refs: check_cross_refs,
      &progress
    )
    result[:post_audit] = post
    result[:verified] = post.healthy?
  end

  result
end

#repair_indexes!(audit_results = nil) ⇒ Hash

Repairs indexes by running existing rebuild methods for stale indexes.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_unique_indexes

Returns:

  • (Hash)

    [index_names]



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/familia/horreum/management/repair.rb', line 114

def repair_indexes!(audit_results = nil)
  audit_results ||= audit_unique_indexes

  rebuilt = []

  audit_results.each do |idx_result|
    index_name = idx_result[:index_name]
    next if idx_result[:stale].empty? && idx_result[:missing].empty?

    rebuild_method = :"rebuild_#{index_name}"
    if respond_to?(rebuild_method)
      send(rebuild_method)
      rebuilt << index_name
    end
  end

  { rebuilt: rebuilt }
end

#repair_instances!(audit_result = nil) ⇒ Hash

Repairs the instances timeline by removing phantoms and adding missing entries.

Parameters:

  • audit_result (Hash, nil) (defaults to: nil)

    Result from audit_instances (runs audit if nil)

Returns:

  • (Hash)

    N, missing_added: N



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/familia/horreum/management/repair.rb', line 18

def repair_instances!(audit_result = nil)
  audit_result ||= audit_instances

  phantoms_removed = 0
  missing_added = 0

  # Remove phantoms (in timeline but key expired/deleted).
  # Batch all ZREMs in a single pipeline to avoid N round-trips.
  phantoms = audit_result[:phantoms]
  unless phantoms.empty?
    instances_key = instances.dbkey
    pipelined do |pipe|
      phantoms.each do |identifier|
        pipe.zrem(instances_key, identifier)
      end
    end
    phantoms_removed = phantoms.size
  end

  # Add missing (key exists but not in timeline).
  # Batch-load all objects via load_multi, then batch ZADDs in a pipeline.
  missing = audit_result[:missing]
  unless missing.empty?
    objects = load_multi(missing)
    instances_key = instances.dbkey
    pipelined do |pipe|
      missing.each_with_index do |identifier, idx|
        obj = objects[idx]
        next unless obj # Key expired between SCAN and load

        score = extract_timestamp_score(obj)
        pipe.zadd(instances_key, score, identifier)
        missing_added += 1
      end
    end
  end

  { phantoms_removed: phantoms_removed, missing_added: missing_added }
end

#repair_multi_indexes!(audit_results = nil) ⇒ Hash

Repairs multi-indexes by invoking the existing rebuild_ methods. Handles both class-level and instance-scoped indexes.

For class-level multi-indexes the rebuild method lives on the indexed class (e.g. +Customer.rebuild_role_index+). For instance-scoped multi-indexes the rebuild method lives on each scope instance (e.g. +company.rebuild_dept_index+); we iterate the scope class's instances timeline and rebuild per scope.

Indexes whose audit status is :ok are skipped. Indexes for which a rebuild method is unavailable or a scope class has no instances collection are recorded in +:skipped+ with a reason.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_multi_indexes

Returns:

  • (Hash)

    rebuilt_per_scope:, skipped:



329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/familia/horreum/management/repair.rb', line 329

def repair_multi_indexes!(audit_results = nil)
  audit_results ||= audit_multi_indexes

  rebuilt = []
  rebuilt_per_scope = []
  skipped = []

  unless respond_to?(:indexing_relationships)
    return { rebuilt: rebuilt, rebuilt_per_scope: rebuilt_per_scope, skipped: skipped }
  end

  multi_rels = indexing_relationships.select { |r| r.cardinality == :multi }

  audit_results.each do |idx_result|
    next if idx_result[:status] == :ok

    index_name = idx_result[:index_name]
    rel = multi_rels.find { |r| r.index_name == index_name }
    unless rel
      skipped << { index_name: index_name, reason: :relationship_missing }
      next
    end

    if rel.class_level?
      method = :"rebuild_#{index_name}"
      if respond_to?(method)
        send(method)
        rebuilt << index_name
      else
        skipped << { index_name: index_name, reason: :rebuild_method_missing }
      end
    else
      scopes_rebuilt = rebuild_instance_scoped_multi_index(rel, skipped)
      rebuilt_per_scope << { index_name: index_name, scopes_rebuilt: scopes_rebuilt } if scopes_rebuilt
    end
  end

  { rebuilt: rebuilt, rebuilt_per_scope: rebuilt_per_scope, skipped: skipped }
end

#repair_participations!(audit_results = nil) ⇒ Hash

Repairs participation collections by removing stale members.

Removes identifiers from the actual participation collections (not the instances timeline). Each stale entry from the audit carries a collection_key identifying the exact Redis key to remove from, plus the raw identifier string to remove.

Uses raw Redis commands (ZREM/SREM/LREM) because the stored member values are raw identifier strings (not JSON-encoded), and the DataType#remove method would JSON-encode string args.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_participations

Returns:

  • (Hash)

    N



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/familia/horreum/management/repair.rb', line 147

def repair_participations!(audit_results = nil)
  audit_results ||= audit_participations

  # Build a lookup from collection_name to the target class's dbclient.
  # The audit reads collections using target_class.dbclient, so repairs
  # must use the same client (critical in multi-database setups).
  client_by_name = {}
  if respond_to?(:participation_relationships)
    participation_relationships.each do |rel|
      client_by_name[rel.collection_name.to_s] = rel.target_class.dbclient
    end
  end

  # Collect all valid stale entries and group by collection_key
  # so we can batch Redis operations instead of 2N round-trips.
  grouped = Hash.new { |h, k| h[k] = [] }
  key_clients = {}
  audit_results.each do |part_result|
    part_result[:stale_members].each do |entry|
      identifier = entry[:identifier]
      collection_key = entry[:collection_key]
      collection_name = entry[:collection_name]
      next unless collection_key && identifier

      grouped[collection_key] << identifier
      key_clients[collection_key] ||= client_by_name[collection_name.to_s]
    end
  end

  return { stale_removed: 0 } if grouped.empty?

  stale_removed = batch_remove_stale_members(grouped, key_clients)
  { stale_removed: stale_removed }
end

Removes orphaned collection keys detected by audit_related_fields.

Each entry's orphaned_keys list contains full Redis keys whose parent Horreum hash no longer exists. This method DELs each of those keys, tracking successes and failures per key. A Redis::CommandError on any single DEL is captured in failed_keys so the rest of the batch can still be processed.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_related_fields (runs a fresh audit when nil)

Yields:

  • (Hash)

    Progress: :repair_related_fields, current:, total:

Returns:

  • (Hash)

    [key, ...], failed_keys: [{key:, error:, ...], status:}



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/familia/horreum/management/repair.rb', line 195

def repair_related_fields!(audit_results = nil, &progress)
  audit_results ||= audit_related_fields

  orphaned_keys = audit_results.flat_map { |entry| Array(entry[:orphaned_keys]) }
  total = orphaned_keys.size

  removed_keys = []
  failed_keys = []

  orphaned_keys.each_with_index do |key, idx|
    begin
      dbclient.del(key)
      removed_keys << key
    rescue Redis::CommandError => e
      failed_keys << { key: key, error: e.message }
    end
    progress&.call(phase: :repair_related_fields, current: idx + 1, total: total)
  end

  status = removed_keys.empty? && failed_keys.empty? ? :ok : :issues_found

  {
    removed_keys: removed_keys,
    failed_keys: failed_keys,
    status: status,
  }
end

#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator

SCAN helper for enumerating keys matching a pattern.

Parameters:

  • filter (String) (defaults to: '*')

    Glob filter appended to class prefix (default: '*')

  • batch_size (Integer) (defaults to: 100)

    SCAN cursor count hint (default: 100)

Yields:

  • (String)

    Each matching key

Returns:

  • (Enumerator)

    If no block given



376
377
378
379
380
381
382
383
384
385
386
# File 'lib/familia/horreum/management/repair.rb', line 376

def scan_keys(filter = '*', batch_size: 100, &block)
  pattern = dbkey(filter)
  return enum_for(:scan_keys, filter, batch_size: batch_size) unless block_given?

  cursor = "0"
  loop do
    cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size)
    keys.each(&block)
    break if cursor == "0"
  end
end