Class: JobIteration::EnumeratorBuilder

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/job-iteration/enumerator_builder.rb

Defined Under Namespace

Classes: Wrapper

Instance Method Summary collapse

Constructor Details

#initialize(job, wrapper: Wrapper) ⇒ EnumeratorBuilder

Returns a new instance of EnumeratorBuilder.



34
35
36
37
# File 'lib/job-iteration/enumerator_builder.rb', line 34

def initialize(job, wrapper: Wrapper)
  @job = job
  @wrapper = wrapper
end

Instance Method Details

#build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:, **args) ⇒ Object Also known as: active_record_on_batch_relations

Builds Enumerator from Active Record Relation and enumerates on batches, yielding Active Record Relations. See documentation for #build_active_record_enumerator_on_batches.



171
172
173
174
175
176
177
178
179
# File 'lib/job-iteration/enumerator_builder.rb', line 171

def build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:, **args)
  enum = JobIteration::ActiveRecordBatchEnumerator.new(
    scope,
    cursor: cursor,
    **args,
  ).each
  enum = wrap(self, enum) if wrap
  enum
end

#build_active_record_enumerator_on_batches(scope, cursor:, **args) ⇒ Object Also known as: active_record_on_batches

Builds Enumerator from Active Record Relation and enumerates on batches of records. Each Enumerator tick moves the cursor batch_size rows forward.

batch_size: sets how many records will be fetched in one batch. Defaults to 100.

For the rest of arguments, see documentation for #build_active_record_enumerator_on_records



145
146
147
148
149
150
151
152
# File 'lib/job-iteration/enumerator_builder.rb', line 145

def build_active_record_enumerator_on_batches(scope, cursor:, **args)
  enum = build_active_record_enumerator(
    scope,
    cursor: cursor,
    **args,
  ).batches
  wrap(self, enum)
end

#build_active_record_enumerator_on_records(scope, cursor:, **args) ⇒ Object Also known as: active_record_on_records

Builds Enumerator from Active Record Relation. Each Enumerator tick moves the cursor one row forward.

columns: argument is used to build the actual query for iteration. columns: defaults to primary key:

1) SELECT * FROM users ORDER BY id LIMIT 100

When iteration is resumed, cursor: and columns: values will be used to continue from the point where iteration stopped:

2) SELECT * FROM users WHERE id > $CURSOR ORDER BY id LIMIT 100

columns: can also take more than one column. In that case, cursor will contain serialized values of all columns at the point where iteration stopped.

Consider this example with columns: [:created_at, :id]. Here’s the query will use on the first iteration:

1) SELECT * FROM `products` ORDER BY created_at, id LIMIT 100

And the query on the next iteration:

2) SELECT * FROM `products`
     WHERE (created_at > '$LAST_CREATED_AT_CURSOR'
       OR (created_at = '$LAST_CREATED_AT_CURSOR' AND (id > '$LAST_ID_CURSOR')))
     ORDER BY created_at, id LIMIT 100

As a result of this query pattern, if the values in these columns change for the records in scope during iteration, they may be skipped or yielded multiple times depending on the nature of the update and the cursor’s value. If the value gets updated to a greater value than the cursor’s value, it will get yielded again. Similarly, if the value gets updated to a lesser value than the curor’s value, it will get skipped.



115
116
117
118
119
120
121
122
# File 'lib/job-iteration/enumerator_builder.rb', line 115

def build_active_record_enumerator_on_records(scope, cursor:, **args)
  enum = build_active_record_enumerator(
    scope,
    cursor: cursor,
    **args,
  ).records
  wrap(self, enum)
end

#build_array_enumerator(enumerable, cursor:) ⇒ Object Also known as: array

Builds Enumerator object from a given array, using cursor as an offset.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/job-iteration/enumerator_builder.rb', line 54

def build_array_enumerator(enumerable, cursor:)
  unless enumerable.is_a?(Array)
    raise ArgumentError, "enumerable must be an Array"
  end

  drop =
    if cursor.nil?
      0
    else
      cursor + 1
    end

  wrap(self, enumerable.each_with_index.drop(drop).to_enum { enumerable.size - drop })
end

#build_csv_enumerator(enumerable, cursor:) ⇒ Object Also known as: csv



191
192
193
194
# File 'lib/job-iteration/enumerator_builder.rb', line 191

def build_csv_enumerator(enumerable, cursor:)
  enum = CsvEnumerator.new(enumerable).rows(cursor: cursor)
  wrap(self, enum)
end

#build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100) ⇒ Object Also known as: csv_on_batches



196
197
198
199
# File 'lib/job-iteration/enumerator_builder.rb', line 196

def build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100)
  enum = CsvEnumerator.new(enumerable).batches(cursor: cursor, batch_size: batch_size)
  wrap(self, enum)
end

#build_nested_enumerator(enums, cursor:) ⇒ Object Also known as: nested

Builds Enumerator for nested iteration.

Examples:

def build_enumerator(cursor:)
  enumerator_builder.nested(
    [
      ->(cursor) {
        enumerator_builder.active_record_on_records(Shop.all, cursor: cursor)
      },
      ->(shop, cursor) {
        enumerator_builder.active_record_on_records(shop.products, cursor: cursor)
      },
      ->(_shop, product, cursor) {
        enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor)
      }
    ],
    cursor: cursor
  )
end

def each_iteration(product_variants_relation)
  # do something
end

Parameters:

  • enums (Array<Proc>)

    an Array of Procs, each should return an Enumerator. Each proc from enums should accept the yielded items from the parent enumerators

    and the `cursor` as its arguments.
    

    Each proc’s ‘cursor` argument is its part from the `build_enumerator`’s ‘cursor` array.

  • cursor (Array<Object>)

    array of offsets for each of the enums to start iteration from



231
232
233
234
# File 'lib/job-iteration/enumerator_builder.rb', line 231

def build_nested_enumerator(enums, cursor:)
  enum = NestedEnumerator.new(enums, cursor: cursor).each
  wrap(self, enum)
end

#build_once_enumerator(cursor:) ⇒ Object Also known as: once

Builds Enumerator objects that iterates once.



42
43
44
# File 'lib/job-iteration/enumerator_builder.rb', line 42

def build_once_enumerator(cursor:)
  wrap(self, build_times_enumerator(1, cursor: cursor))
end

#build_parallel_active_record_enumerator_on_batches(scope, instances:, cursor:, **args) ⇒ Object Also known as: parallel_active_record_on_batches

Builds an Enumerator that iterates over a given Active Record Relation, across instances parallel jobs, and enumerates on batches.

Child job i iterates over the batches of records where the id is equal to (instance % instances).



157
158
159
160
161
162
163
164
165
166
167
# File 'lib/job-iteration/enumerator_builder.rb', line 157

def build_parallel_active_record_enumerator_on_batches(scope, instances:, cursor:, **args)
  build_parallel_enumerator(instances: instances, cursor: cursor) do |instance, instances, inner_cursor|
    build_active_record_enumerator(
      scope,
      cursor: inner_cursor,
      instances: instances,
      instance: instance,
      **args,
    ).batches
  end
end

#build_parallel_active_record_enumerator_on_records(scope, instances:, cursor:, **args) ⇒ Object Also known as: parallel_active_record_on_records

Builds an Enumerator that iterates over a given Active Record Relation, across instances parallel jobs.

Child job i iterates over the records where the id is equal to (instance % instances).



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/job-iteration/enumerator_builder.rb', line 127

def build_parallel_active_record_enumerator_on_records(scope, instances:, cursor:, **args)
  build_parallel_enumerator(instances: instances, cursor: cursor) do |instance, instances, inner_cursor|
    build_active_record_enumerator(
      scope,
      cursor: inner_cursor,
      instances: instances,
      instance: instance,
      **args,
    ).records
  end
end

#build_parallel_array_enumerator(array, instances:, cursor:) ⇒ Object Also known as: parallel_array

Builds an Enumerator that iterates over a given array, across instances parallel jobs.

Child job i iterates over the slice of the array starting at index (array.size / instances * i).floor and ending at index (array.size / instances * (i + 1)).floor - 1.



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/job-iteration/enumerator_builder.rb', line 73

def build_parallel_array_enumerator(array, instances:, cursor:)
  unless array.is_a?(Array)
    raise ArgumentError, "array must be an Array"
  end

  build_parallel_enumerator(instances: instances, cursor: cursor) do |instance, instances, inner_cursor|
    slice_start = (array.size.to_f / instances * instance).floor
    next_slice_start = (array.size.to_f / instances * (instance + 1)).floor
    slice = array[slice_start...next_slice_start]
    build_array_enumerator(slice, cursor: inner_cursor)
  end
end

#build_parallel_enumerator(instances:, cursor:, &block) ⇒ Object Also known as: parallel



236
237
238
239
240
241
242
243
244
245
# File 'lib/job-iteration/enumerator_builder.rb', line 236

def build_parallel_enumerator(instances:, cursor:, &block)
  unless instances.is_a?(Integer) && instances.positive?
    raise ArgumentError, "instances must be a positive Integer"
  end

  return ParallelEnumerator::EnqueueJobs.new(instances) if cursor.nil?

  enum = ParallelEnumerator.new(block, cursor: cursor).to_enum
  wrap(self, enum)
end

#build_throttle_enumerator(enumerable, throttle_on:, backoff:) ⇒ Object Also known as: throttle



181
182
183
184
185
186
187
188
189
# File 'lib/job-iteration/enumerator_builder.rb', line 181

def build_throttle_enumerator(enumerable, throttle_on:, backoff:)
  enum = JobIteration::ThrottleEnumerator.new(
    enumerable,
    @job,
    throttle_on: throttle_on,
    backoff: backoff,
  ).to_enum
  wrap(self, enum)
end

#build_times_enumerator(number, cursor:) ⇒ Object Also known as: times

Builds Enumerator objects that iterates N times and yields number starting from zero.

Raises:

  • (ArgumentError)


47
48
49
50
51
# File 'lib/job-iteration/enumerator_builder.rb', line 47

def build_times_enumerator(number, cursor:)
  raise ArgumentError, "First argument must be an Integer" unless number.is_a?(Integer)

  wrap(self, build_array_enumerator(number.times.to_a, cursor: cursor))
end