Module: Familia::Horreum::RepairMethods

Included in:
ManagementMethods
Defined in:
lib/familia/horreum/management/repair.rb

Overview

RepairMethods provides repair and rebuild operations for Horreum models.

Included in ManagementMethods so every Horreum subclass gets these as class methods (e.g. Customer.repair_instances!, Customer.rebuild_instances).

Instance Method Summary collapse

Instance Method Details

#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer

Full SCAN-based rebuild of the instances timeline with atomic swap.

Scans all hash keys matching this class's pattern, extracts identifiers, and rebuilds the sorted set with timestamps from the objects.

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN cursor count hint (default: 100)

Yields:

  • (Hash)

    Progress: current:, total:

Returns:

  • (Integer)

    Number of instances rebuilt



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/familia/horreum/management/repair.rb', line 67

def rebuild_instances(batch_size: 100, &progress)
  pattern = scan_pattern
  final_key = instances.dbkey
  temp_key = Familia::AtomicOperations.build_temp_key(final_key)

  count = 0
  cursor = "0"
  batch = []

  loop do
    cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size)

    keys.each do |key|
      identifier = extract_identifier_from_key(key)
      next if identifier.nil? || identifier.empty?

      batch << { key: key, identifier: identifier }
    end

    # Process batch when it reaches threshold
    if batch.size >= batch_size
      count += process_rebuild_batch(batch, temp_key)
      progress&.call(phase: :rebuilding, current: count, total: nil)
      batch.clear
    end

    break if cursor == "0"
  end

  # Process remaining batch
  unless batch.empty?
    count += process_rebuild_batch(batch, temp_key)
    progress&.call(phase: :rebuilding, current: count, total: nil)
  end

  # Atomic swap
  Familia::AtomicOperations.atomic_swap(temp_key, final_key, dbclient)

  progress&.call(phase: :completed, current: count, total: count)
  count
end

#repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false) {|Hash| ... } ⇒ Hash

Runs health_check then all repair methods.

By default this repairs the three always-on dimensions (instances, unique indexes, participations). The related_fields and cross_references dimensions are opt-in and must be enabled on the audit side for repair to consider them.

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN batch size passed to health_check

  • audit_collections (Boolean) (defaults to: false)

    When true, health_check runs audit_related_fields and repair_all! calls repair_related_fields! with the result. When false (default), related_fields stays nil in the report and is not repaired.

  • check_cross_refs (Boolean) (defaults to: false)

    When true, health_check runs audit_cross_references and the result is included in the returned report for inspection. NOTE: no automatic repair is performed for cross_reference drift; callers must inspect the audit report and resolve the drift manually. The flag is accepted here for symmetry with the audit side and so repair_all! can surface the dimension through its returned report.

Yields:

  • (Hash)

    Progress callbacks

Returns:

  • (Hash)

    Combined repair results plus the AuditReport



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/familia/horreum/management/repair.rb', line 245

def repair_all!(batch_size: 100, audit_collections: false, check_cross_refs: false, &progress)
  report = health_check(
    batch_size: batch_size,
    audit_collections: audit_collections,
    check_cross_refs: check_cross_refs,
    &progress
  )

  instances_result = repair_instances!(report.instances)
  indexes_result = repair_indexes!(report.unique_indexes)
  participations_result = repair_participations!(report.participations)

  result = {
    report: report,
    instances: instances_result,
    indexes: indexes_result,
    participations: participations_result,
  }

  # Only repair related fields when the audit actually checked them.
  # Nil means the caller did not pass audit_collections: true to
  # health_check, so repair has nothing to act on.
  unless report.related_fields.nil?
    result[:related_fields] = repair_related_fields!(report.related_fields, &progress)
  end

  result
end

#repair_indexes!(audit_results = nil) ⇒ Hash

Repairs indexes by running existing rebuild methods for stale indexes.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_unique_indexes

Returns:

  • (Hash)

    [index_names]



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/familia/horreum/management/repair.rb', line 114

def repair_indexes!(audit_results = nil)
  audit_results ||= audit_unique_indexes

  rebuilt = []

  audit_results.each do |idx_result|
    index_name = idx_result[:index_name]
    next if idx_result[:stale].empty? && idx_result[:missing].empty?

    rebuild_method = :"rebuild_#{index_name}"
    if respond_to?(rebuild_method)
      send(rebuild_method)
      rebuilt << index_name
    end
  end

  { rebuilt: rebuilt }
end

#repair_instances!(audit_result = nil) ⇒ Hash

Repairs the instances timeline by removing phantoms and adding missing entries.

Parameters:

  • audit_result (Hash, nil) (defaults to: nil)

    Result from audit_instances (runs audit if nil)

Returns:

  • (Hash)

    N, missing_added: N



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/familia/horreum/management/repair.rb', line 18

def repair_instances!(audit_result = nil)
  audit_result ||= audit_instances

  phantoms_removed = 0
  missing_added = 0

  # Remove phantoms (in timeline but key expired/deleted).
  # Batch all ZREMs in a single pipeline to avoid N round-trips.
  phantoms = audit_result[:phantoms]
  unless phantoms.empty?
    instances_key = instances.dbkey
    pipelined do |pipe|
      phantoms.each do |identifier|
        pipe.zrem(instances_key, identifier)
      end
    end
    phantoms_removed = phantoms.size
  end

  # Add missing (key exists but not in timeline).
  # Batch-load all objects via load_multi, then batch ZADDs in a pipeline.
  missing = audit_result[:missing]
  unless missing.empty?
    objects = load_multi(missing)
    instances_key = instances.dbkey
    pipelined do |pipe|
      missing.each_with_index do |identifier, idx|
        obj = objects[idx]
        next unless obj # Key expired between SCAN and load

        score = extract_timestamp_score(obj)
        pipe.zadd(instances_key, score, identifier)
        missing_added += 1
      end
    end
  end

  { phantoms_removed: phantoms_removed, missing_added: missing_added }
end

#repair_participations!(audit_results = nil) ⇒ Hash

Repairs participation collections by removing stale members.

Removes identifiers from the actual participation collections (not the instances timeline). Each stale entry from the audit carries a collection_key identifying the exact Redis key to remove from, plus the raw identifier string to remove.

Uses raw Redis commands (ZREM/SREM/LREM) because the stored member values are raw identifier strings (not JSON-encoded), and the DataType#remove method would JSON-encode string args.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_participations

Returns:

  • (Hash)

    N



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/familia/horreum/management/repair.rb', line 147

def repair_participations!(audit_results = nil)
  audit_results ||= audit_participations

  # Build a lookup from collection_name to the target class's dbclient.
  # The audit reads collections using target_class.dbclient, so repairs
  # must use the same client (critical in multi-database setups).
  client_by_name = {}
  if respond_to?(:participation_relationships)
    participation_relationships.each do |rel|
      client_by_name[rel.collection_name.to_s] = rel.target_class.dbclient
    end
  end

  # Collect all valid stale entries and group by collection_key
  # so we can batch Redis operations instead of 2N round-trips.
  grouped = Hash.new { |h, k| h[k] = [] }
  key_clients = {}
  audit_results.each do |part_result|
    part_result[:stale_members].each do |entry|
      identifier = entry[:identifier]
      collection_key = entry[:collection_key]
      collection_name = entry[:collection_name]
      next unless collection_key && identifier

      grouped[collection_key] << identifier
      key_clients[collection_key] ||= client_by_name[collection_name.to_s]
    end
  end

  return { stale_removed: 0 } if grouped.empty?

  stale_removed = batch_remove_stale_members(grouped, key_clients)
  { stale_removed: stale_removed }
end

Removes orphaned collection keys detected by audit_related_fields.

Each entry's orphaned_keys list contains full Redis keys whose parent Horreum hash no longer exists. This method DELs each of those keys, tracking successes and failures per key. A Redis::CommandError on any single DEL is captured in failed_keys so the rest of the batch can still be processed.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_related_fields (runs a fresh audit when nil)

Yields:

  • (Hash)

    Progress: :repair_related_fields, current:, total:

Returns:

  • (Hash)

    [key, ...], failed_keys: [{key:, error:, ...], status:}



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/familia/horreum/management/repair.rb', line 195

def repair_related_fields!(audit_results = nil, &progress)
  audit_results ||= audit_related_fields

  orphaned_keys = audit_results.flat_map { |entry| Array(entry[:orphaned_keys]) }
  total = orphaned_keys.size

  removed_keys = []
  failed_keys = []

  orphaned_keys.each_with_index do |key, idx|
    begin
      dbclient.del(key)
      removed_keys << key
    rescue Redis::CommandError => e
      failed_keys << { key: key, error: e.message }
    end
    progress&.call(phase: :repair_related_fields, current: idx + 1, total: total)
  end

  status = removed_keys.empty? && failed_keys.empty? ? :ok : :issues_found

  {
    removed_keys: removed_keys,
    failed_keys: failed_keys,
    status: status,
  }
end

#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator

SCAN helper for enumerating keys matching a pattern.

Parameters:

  • filter (String) (defaults to: '*')

    Glob filter appended to class prefix (default: '*')

  • batch_size (Integer) (defaults to: 100)

    SCAN cursor count hint (default: 100)

Yields:

  • (String)

    Each matching key

Returns:

  • (Enumerator)

    If no block given



281
282
283
284
285
286
287
288
289
290
291
# File 'lib/familia/horreum/management/repair.rb', line 281

def scan_keys(filter = '*', batch_size: 100, &block)
  pattern = dbkey(filter)
  return enum_for(:scan_keys, filter, batch_size: batch_size) unless block_given?

  cursor = "0"
  loop do
    cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size)
    keys.each(&block)
    break if cursor == "0"
  end
end