Module: Familia::DataType::CollectionBase
- Included in:
- HashKey, ListKey, SortedSet, UnsortedSet
- Defined in:
- lib/familia/data_type/collection_base.rb
Overview
CollectionBase - Base module for iterable DataType classes
Collection types represent multi-value structures in Redis (LIST, SET, ZSET, HASH). They include Enumerable and provide batch iteration via each_record for reference collections.
Each collection type must implement its own each method that:
- Yields elements to the block when given
- Returns an Enumerator when no block given
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
- #collection_type? ⇒ Boolean
-
#each_record(batch_size: 100, pipeline: nil, **filters) {|record| ... } ⇒ Enumerator, self
Iterates over identifiers, loading each as a Horreum record.
Instance Method Details
#collection_type? ⇒ Boolean
34 35 36 |
# File 'lib/familia/data_type/collection_base.rb', line 34 def collection_type? self.class.collection_type? end |
#each_record(batch_size: 100, pipeline: nil, **filters) {|record| ... } ⇒ Enumerator, self
Iterates over identifiers, loading each as a Horreum record.
This method is designed for DataTypes that store object identifiers.
It requires the collection to know which class to hydrate, supplied by
either the record_class: option (a loading-only hint that does not
change read deserialization — used by participates_in) or the class:
option on a reference: true collection (e.g. instances,
unique_index). It loads records in batches using the record class's
load_multi method and yields each loaded record.
Ghost identifiers (where the underlying key has expired) are silently filtered out.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/familia/data_type/collection_base.rb', line 74 def each_record(batch_size: 100, pipeline: nil, **filters, &block) return to_enum(:each_record, batch_size: batch_size, pipeline: pipeline, **filters) unless block # Determine the class to load records from. # # Two opts can supply it, in priority order: # - :record_class — a loading-only hint. It tells each_record which # class to hydrate, WITHOUT affecting how the collection deserializes # reads (members/member?/score keep the generic DataType semantics). # Used by participates_in collections (issue #297). # - :class — set alongside `reference: true` for true reference # collections (e.g. `instances`, unique_index), where raw-string read # semantics are also wanted. # # load_multi is identifier-type-tolerant (it builds keys via dbkey), so # whichever path `each` yields the identifier through, loading works. record_class = @opts[:record_class] || @opts[:class] unless record_class&.respond_to?(:load_multi) raise Familia::Problem, "each_record requires a DataType with a :record_class (or :class) option that responds to load_multi" end # Validate batch_size and pipeline constraints raise ArgumentError, "batch_size must be a positive integer (got #{batch_size.inspect})" unless batch_size.is_a?(Integer) && batch_size.positive? raise ArgumentError, "pipeline must be nil or a positive integer (got #{pipeline.inspect})" unless pipeline.nil? || (pipeline.is_a?(Integer) && pipeline.positive?) raise ArgumentError, "pipeline (#{pipeline}) cannot exceed batch_size (#{batch_size})" if pipeline&.> batch_size # Collect identifiers in batches buffer = [] process_batch = lambda do |ids| return if ids.empty? # Load records using the class's load_multi (pipelined HGETALLs) records = record_class.load_multi(ids) # Filter out ghosts (nil results from expired keys) live_records = records.compact if pipeline.nil? # Serial mode - no pipelining, execute block for each record directly live_records.each { |record| block.call(record) } else # Pipelined mode - group records and wrap each group in a pipeline live_records.each_slice(pipeline) do |group| record_class.pipelined do group.each { |record| block.call(record) } end end end end # Iterate using the type's each method with any filters each(**filters) do |member| # HashKey yields [field, value] pairs where the value is the stored # object identifier (e.g. unique_index maps field_value => identifier), # so extract the value. List/Set/SortedSet yield the identifier directly. identifier = member.is_a?(Array) ? member.last : member buffer << identifier if buffer.size >= batch_size process_batch.call(buffer) buffer.clear end end # Process remaining items process_batch.call(buffer) unless buffer.empty? self end |