Module: Familia::Horreum::AuditMethods

Included in:
ManagementMethods
Defined in:
lib/familia/horreum/management/audit.rb

Overview

AuditMethods provides proactive consistency detection for Horreum models.

Included in ManagementMethods so every Horreum subclass gets these as class methods (e.g. Customer.audit_instances, Customer.health_check).

Instance Method Summary collapse

Instance Method Details

#audit_cross_references(batch_size: 100) {|Hash| ... } ⇒ Hash

Audits drift between the instances ZSET and class-level unique indexes that per-registry audits cannot surface alone.

For every live identifier in instances, verifies that each class-level unique index has an entry keyed by the object's current field value and that entry points back to the same identifier.

Two failure modes are detected:

  • in_instances_missing_unique_index: live object has a populated indexed field but no corresponding entry exists in the index.
  • index_points_to_wrong_identifier: entry exists but references a different identifier (split-brain between two objects).

Scope is limited to class-level unique indexes (within nil or :class). Multi-indexes are covered by audit_multi_indexes; instance-scoped unique indexes are out of scope for this audit.

Parameters:

  • batch_size (Integer) (defaults to: 100)

    load_multi batch size (default: 100)

Yields:

  • (Hash)

    Progress: :cross_references, current:, total:

Returns:

  • (Hash)

    [], index_points_to_wrong_identifier: [], status:



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/familia/horreum/management/audit.rb', line 169

def audit_cross_references(batch_size: 100, &progress)
  empty_result = {
    in_instances_missing_unique_index: [],
    index_points_to_wrong_identifier: [],
    status: :ok,
  }

  return empty_result unless respond_to?(:indexing_relationships)

  class_unique_rels = indexing_relationships.select { |rel|
    rel.cardinality == :unique && (rel.within.nil? || rel.within == :class)
  }
  return empty_result if class_unique_rels.empty?

  instance_ids = instances.members
  total = instance_ids.size
  processed = 0

  in_instances_missing_unique_index = []
  index_points_to_wrong_identifier = []

  instance_ids.each_slice(batch_size) do |batch|
    objects = load_multi(batch)
    processed += batch.size

    # Per unique index, collect (identifier, field_value) pairs from live
    # objects in this batch and resolve them with a single HMGET round
    # trip instead of one HGET per (object x index) combination.
    class_unique_rels.each do |rel|
      next unless respond_to?(rel.index_name)

      lookups = []
      batch.zip(objects).each do |identifier, obj|
        next unless obj

        field_value = obj.send(rel.field)
        next if field_value.nil? || field_value.to_s.strip.empty?

        lookups << [identifier, field_value.to_s]
      end
      next if lookups.empty?

      index_dbkey = send(rel.index_name).dbkey
      raw_values = dbclient.hmget(index_dbkey, *lookups.map(&:last))

      lookups.each_with_index do |(identifier, field_value_str), idx|
        indexed_id = deserialize_index_value(raw_values[idx])

        if indexed_id.nil?
          in_instances_missing_unique_index << {
            identifier: identifier,
            index_name: rel.index_name,
            field_value: field_value_str,
            existing_index_value: nil,
          }
        elsif indexed_id != identifier
          index_points_to_wrong_identifier << {
            index_name: rel.index_name,
            field_value: field_value_str,
            expected_id: identifier,
            index_id: indexed_id,
          }
        end
      end
    end

    progress&.call(phase: :cross_references, current: processed, total: total)
  end

  status = if in_instances_missing_unique_index.empty? && index_points_to_wrong_identifier.empty?
    :ok
  else
    :issues_found
  end

  {
    in_instances_missing_unique_index: in_instances_missing_unique_index,
    index_points_to_wrong_identifier: index_points_to_wrong_identifier,
    status: status,
  }
end

#audit_instances(batch_size: 100) {|Hash| ... } ⇒ Hash

Compares the instances timeline against actual DB keys via SCAN.

Detects:

  • Phantoms: identifiers in timeline but no corresponding hash key
  • Missing: hash keys in DB but not in timeline

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN cursor count hint (default: 100)

Yields:

  • (Hash)

    Progress: current:, total:

Returns:

  • (Hash)

    [], missing: [], count_timeline: N, count_scan: N



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/familia/horreum/management/audit.rb', line 25

def audit_instances(batch_size: 100, &progress)
  # Phase 1: Collect identifiers from timeline
  timeline_ids = Set.new(instances.members)
  progress&.call(phase: :timeline_collected, current: timeline_ids.size, total: nil)

  # Phase 2: SCAN keys and extract identifiers (source of truth)
  scan_ids = scan_identifiers(batch_size: batch_size, &progress)

  # Phase 3: Set differences
  phantoms = (timeline_ids - scan_ids).to_a
  missing = (scan_ids - timeline_ids).to_a

  {
    phantoms: phantoms,
    missing: missing,
    count_timeline: timeline_ids.size,
    count_scan: scan_ids.size,
  }
end

#audit_multi_indexes(scanned_identifiers: nil, loaded_objects: nil) ⇒ Array<Hash>

Audits all multi indexes.

For each multi index:

  • SCANs for per-value set keys
  • Checks that each member exists and field value matches
  • Detects orphaned set keys (sets for values no object has)

Parameters:

  • scanned_identifiers (Array<String>, nil) (defaults to: nil)

    Internal optimization parameter; do not rely on this from external callers. When provided, skips the per-index SCAN pass used to detect missing buckets.

  • loaded_objects (Array<Horreum>, nil) (defaults to: nil)

    Internal optimization parameter aligned with scanned_identifiers.

Returns:

  • (Array<Hash>)

    [stale_members: [], orphaned_keys: []]



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/familia/horreum/management/audit.rb', line 89

def audit_multi_indexes(scanned_identifiers: nil, loaded_objects: nil)
  return [] unless respond_to?(:indexing_relationships)

  indexing_relationships.select { |r|
    r.cardinality == :multi
  }.map { |rel|
    audit_single_multi_index(
      rel,
      scanned_identifiers: scanned_identifiers,
      loaded_objects: loaded_objects,
    )
  }
end

#audit_participations(sample_size: nil) ⇒ Array<Hash>

Audits participation collections for stale members.

For each participation relationship defined on this class:

  • Class-level: checks the single class collection directly
  • Instance-level: SCANs for collection keys on the target class
  • Enumerates raw members of each collection
  • Verifies each referenced participant object still exists

Parameters:

  • sample_size (Integer, nil) (defaults to: nil)

    Limit members to check per collection (nil = all)

Returns:

  • (Array<Hash>)

    [stale_members: [{identifier:, collection_key:, reason:]}]



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/familia/horreum/management/audit.rb', line 114

def audit_participations(sample_size: nil)
  return [] unless respond_to?(:participation_relationships)

  participation_relationships.flat_map { |rel|
    if rel.target_class == self
      # Class-level participation (class_participates_in)
      [audit_class_participation(rel, sample_size: sample_size)]
    else
      # Instance-level participation (participates_in TargetClass, :collection)
      audit_instance_participations(rel, sample_size: sample_size)
    end
  }
end

Audits instance-level related_fields (list/set/zset/hashkey) for orphaned collection keys whose parent Horreum hash no longer exists.

destroy! cleans related fields inside a transaction, so orphans only arise when destroy! is interrupted (process crash, manual Redis tampering, bugs in older code paths). This audit surfaces those cases.

Class-level related fields (class_list/class_set/class_hashkey) are intentionally skipped: their keys are Settings#prefix:field_name with no identifier segment, so they cannot be orphaned by instance destruction.

Returns:

  • (Array<Hash>)

    One entry per instance-level related field: [klass:, orphaned_keys: [...], count:, status:]



142
143
144
145
146
# File 'lib/familia/horreum/management/audit.rb', line 142

def audit_related_fields
  return [] unless relations?

  related_fields.values.map { |definition| audit_single_related_field(definition) }
end

#audit_unique_indexes(scanned_identifiers: nil, loaded_objects: nil) ⇒ Array<Hash>

Audits all unique indexes (class-level only, where within is nil).

For each unique index:

  • Reads all entries from the index HashKey
  • Checks that each indexed object exists and its field value matches
  • Checks for objects that should be indexed but aren't

Parameters:

  • scanned_identifiers (Array<String>, nil) (defaults to: nil)

    Internal optimization parameter; do not rely on this from external callers. When provided (e.g. threaded through from health_check), skips the per-index SCAN pass. When omitted, each index computes its own scan.

  • loaded_objects (Array<Horreum>, nil) (defaults to: nil)

    Internal optimization parameter aligned with scanned_identifiers. When provided, skips the per-index load_multi call.

Returns:

  • (Array<Hash>)

    [stale: [...], missing: [...]]



61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/familia/horreum/management/audit.rb', line 61

def audit_unique_indexes(scanned_identifiers: nil, loaded_objects: nil)
  return [] unless respond_to?(:indexing_relationships)

  indexing_relationships.select { |r|
    r.cardinality == :unique && r.within.nil?
  }.map { |rel|
    audit_single_unique_index(
      rel,
      scanned_identifiers: scanned_identifiers,
      loaded_objects: loaded_objects,
    )
  }
end

#health_check(batch_size: 100, sample_size: nil, audit_collections: false, check_cross_refs: false) {|Hash| ... } ⇒ AuditReport

Runs all audits and wraps results in an AuditReport.

The related_fields audit is opt-in via audit_collections: true because it performs an additional SCAN per instance-level field. When omitted (or false), AuditReport#related_fields is nil which signals "not checked" rather than "checked and clean".

The cross-references audit is opt-in via check_cross_refs: true. It walks every identifier in the instances ZSET and cross-checks each class-level unique index; skipping it keeps the default health_check fast. When omitted (or false), AuditReport#cross_references is nil, signalling "not checked".

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN batch size for instances audit

  • sample_size (Integer, nil) (defaults to: nil)

    Sample size for participation audit

  • audit_collections (Boolean) (defaults to: false)

    When true, also run audit_related_fields

  • check_cross_refs (Boolean) (defaults to: false)

    When true, also run audit_cross_references

Yields:

  • (Hash)

    Progress from audit_instances

Returns:



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/familia/horreum/management/audit.rb', line 271

def health_check(batch_size: 100, sample_size: nil, audit_collections: false,
                 check_cross_refs: false, &progress)
  start_time = Familia.now

  inst = audit_instances(batch_size: batch_size, &progress)

  # Reuse the SCAN pass and the HGETALL pipeline across both index
  # audits. Without this, a model with N unique indexes and M multi
  # indexes would trigger N+M additional SCANs and load_multi round
  # trips during their "missing" phases.
  has_indexes = respond_to?(:indexing_relationships) && indexing_relationships.any? { |r|
    (r.cardinality == :unique && r.within.nil?) || r.cardinality == :multi
  }

  if has_indexes
    shared_ids = scan_identifiers(batch_size: batch_size).to_a
    shared_objects = load_multi(shared_ids)
  else
    shared_ids = nil
    shared_objects = nil
  end

  uniq = audit_unique_indexes(
    scanned_identifiers: shared_ids,
    loaded_objects: shared_objects,
  )
  multi = audit_multi_indexes(
    scanned_identifiers: shared_ids,
    loaded_objects: shared_objects,
  )
  parts = audit_participations(sample_size: sample_size)
  related = audit_collections ? audit_related_fields : nil
  cross_refs = check_cross_refs ? audit_cross_references(batch_size: batch_size, &progress) : nil

  duration = Familia.now - start_time

  AuditReport.new(
    model_class: name,
    audited_at: start_time,
    instances: inst,
    unique_indexes: uniq,
    multi_indexes: multi,
    participations: parts,
    related_fields: related,
    cross_references: cross_refs,
    duration: duration
  )
end