Module: Familia::DataType::CollectionBase

Included in:
HashKey, ListKey, SortedSet, UnsortedSet
Defined in:
lib/familia/data_type/collection_base.rb

Overview

CollectionBase - Base module for iterable DataType classes

Collection types represent multi-value structures in Redis (LIST, SET, ZSET, HASH). They include Enumerable and provide batch iteration via each_record for reference collections.

Each collection type must implement its own each method that:

  • Yields elements to the block when given
  • Returns an Enumerator when no block given

Examples:

Collection types

ListKey     - Redis LIST
UnsortedSet - Redis SET
SortedSet   - Redis ZSET
HashKey     - Redis HASH

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#collection_type?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/familia/data_type/collection_base.rb', line 34

def collection_type?
  self.class.collection_type?
end

#each_record(batch_size: 100, pipeline: nil, **filters) {|record| ... } ⇒ Enumerator, self

Iterates over identifiers, loading each as a Horreum record.

This method is designed for DataTypes that store object identifiers (typically with reference: true). It loads records in batches using the parent class's load_multi method and yields each loaded record.

Ghost identifiers (where the underlying key has expired) are silently filtered out.

Examples:

Iterate over all records (no pipelining, safe default)

User.instances.each_record { |user| user.deactivate! }

With time filter (for SortedSet)

User.instances.each_record(since: 1.day.ago) { |u| notify(u) }

Pipeline writes in groups

items.each_record(batch_size: 500, pipeline: 50) { |r| r.foo! 'bar' }

Parameters:

  • batch_size (Integer) (defaults to: 100)

    Number of identifiers to load per batch

  • pipeline (Integer, nil) (defaults to: nil)

    Controls pipelining depth for writes in the block. When nil (default), writes are serial (no pipelining). When a positive integer, fast writers in the block will be pipelined in groups of this size. Must not exceed batch_size.

  • filters (Hash)

    Additional filter parameters passed to each. Available filters depend on the collection type:

    • SortedSet: since:, until:, cursor_batch_size:
    • UnsortedSet/HashKey: matching:, cursor_batch_size:
    • ListKey: cursor_batch_size: only Passing unsupported filters raises ArgumentError.

Yields:

  • (record)

    Each loaded Horreum record (non-nil)

Returns:

  • (Enumerator, self)

    Returns Enumerator if no block given, self otherwise

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/familia/data_type/collection_base.rb', line 70

def each_record(batch_size: 100, pipeline: nil, **filters, &block)
  return to_enum(:each_record, batch_size: batch_size, pipeline: pipeline, **filters) unless block

  # Determine the class to load records from
  # For reference DataTypes, @opts[:class] holds the Horreum class
  record_class = @opts[:class]
  unless record_class&.respond_to?(:load_multi)
    raise Familia::Problem, "each_record requires a reference DataType with a :class option that responds to load_multi"
  end

  # Validate batch_size and pipeline constraints
  raise ArgumentError, "batch_size must be a positive integer (got #{batch_size.inspect})" unless batch_size.is_a?(Integer) && batch_size.positive?
  raise ArgumentError, "pipeline must be nil or a positive integer (got #{pipeline.inspect})" unless pipeline.nil? || (pipeline.is_a?(Integer) && pipeline.positive?)
  raise ArgumentError, "pipeline (#{pipeline}) cannot exceed batch_size (#{batch_size})" if pipeline&.> batch_size

  # Collect identifiers in batches
  buffer = []

  process_batch = lambda do |ids|
    return if ids.empty?

    # Load records using the class's load_multi (pipelined HGETALLs)
    records = record_class.load_multi(ids)

    # Filter out ghosts (nil results from expired keys)
    live_records = records.compact

    if pipeline.nil?
      # Serial mode - no pipelining, execute block for each record directly
      live_records.each { |record| block.call(record) }
    else
      # Pipelined mode - group records and wrap each group in a pipeline
      live_records.each_slice(pipeline) do |group|
        record_class.pipelined do
          group.each { |record| block.call(record) }
        end
      end
    end
  end

  # Iterate using the type's each method with any filters
  each(**filters) do |member|
    # HashKey yields [field, value] pairs; extract field as identifier
    identifier = member.is_a?(Array) ? member.first : member
    buffer << identifier

    if buffer.size >= batch_size
      process_batch.call(buffer)
      buffer.clear
    end
  end

  # Process remaining items
  process_batch.call(buffer) unless buffer.empty?

  self
end