Class: JobIteration::EnumeratorBuilder
- Inherits:
-
Object
- Object
- JobIteration::EnumeratorBuilder
- Extended by:
- Forwardable
- Defined in:
- lib/job-iteration/enumerator_builder.rb
Defined Under Namespace
Classes: Wrapper
Instance Method Summary collapse
-
#build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:, **args) ⇒ Object
(also: #active_record_on_batch_relations)
Builds Enumerator from Active Record Relation and enumerates on batches, yielding Active Record Relations.
-
#build_active_record_enumerator_on_batches(scope, cursor:, **args) ⇒ Object
(also: #active_record_on_batches)
Builds Enumerator from Active Record Relation and enumerates on batches of records.
-
#build_active_record_enumerator_on_records(scope, cursor:, **args) ⇒ Object
(also: #active_record_on_records)
Builds Enumerator from Active Record Relation.
-
#build_array_enumerator(enumerable, cursor:) ⇒ Object
(also: #array)
Builds Enumerator object from a given array, using
cursoras an offset. - #build_csv_enumerator(enumerable, cursor:) ⇒ Object (also: #csv)
- #build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100) ⇒ Object (also: #csv_on_batches)
-
#build_nested_enumerator(enums, cursor:) ⇒ Object
(also: #nested)
Builds Enumerator for nested iteration.
-
#build_once_enumerator(cursor:) ⇒ Object
(also: #once)
Builds Enumerator objects that iterates once.
-
#build_parallel_active_record_enumerator_on_batches(scope, instances:, cursor:, **args) ⇒ Object
(also: #parallel_active_record_on_batches)
Builds an Enumerator that iterates over a given Active Record Relation, across
instancesparallel jobs, and enumerates on batches. -
#build_parallel_active_record_enumerator_on_records(scope, instances:, cursor:, **args) ⇒ Object
(also: #parallel_active_record_on_records)
Builds an Enumerator that iterates over a given Active Record Relation, across
instancesparallel jobs. -
#build_parallel_array_enumerator(array, instances:, cursor:) ⇒ Object
(also: #parallel_array)
Builds an Enumerator that iterates over a given array, across
instancesparallel jobs. - #build_parallel_enumerator(instances:, cursor:, &block) ⇒ Object (also: #parallel)
- #build_throttle_enumerator(enumerable, throttle_on:, backoff:) ⇒ Object (also: #throttle)
-
#build_times_enumerator(number, cursor:) ⇒ Object
(also: #times)
Builds Enumerator objects that iterates N times and yields number starting from zero.
-
#initialize(job, wrapper: Wrapper) ⇒ EnumeratorBuilder
constructor
A new instance of EnumeratorBuilder.
Constructor Details
#initialize(job, wrapper: Wrapper) ⇒ EnumeratorBuilder
Returns a new instance of EnumeratorBuilder.
34 35 36 37 |
# File 'lib/job-iteration/enumerator_builder.rb', line 34 def initialize(job, wrapper: Wrapper) @job = job @wrapper = wrapper end |
Instance Method Details
#build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:, **args) ⇒ Object Also known as: active_record_on_batch_relations
Builds Enumerator from Active Record Relation and enumerates on batches, yielding Active Record Relations. See documentation for #build_active_record_enumerator_on_batches.
171 172 173 174 175 176 177 178 179 |
# File 'lib/job-iteration/enumerator_builder.rb', line 171 def build_active_record_enumerator_on_batch_relations(scope, wrap: true, cursor:, **args) enum = JobIteration::ActiveRecordBatchEnumerator.new( scope, cursor: cursor, **args, ).each enum = wrap(self, enum) if wrap enum end |
#build_active_record_enumerator_on_batches(scope, cursor:, **args) ⇒ Object Also known as: active_record_on_batches
Builds Enumerator from Active Record Relation and enumerates on batches of records. Each Enumerator tick moves the cursor batch_size rows forward.
batch_size: sets how many records will be fetched in one batch. Defaults to 100.
For the rest of arguments, see documentation for #build_active_record_enumerator_on_records
145 146 147 148 149 150 151 152 |
# File 'lib/job-iteration/enumerator_builder.rb', line 145 def build_active_record_enumerator_on_batches(scope, cursor:, **args) enum = build_active_record_enumerator( scope, cursor: cursor, **args, ).batches wrap(self, enum) end |
#build_active_record_enumerator_on_records(scope, cursor:, **args) ⇒ Object Also known as: active_record_on_records
Builds Enumerator from Active Record Relation. Each Enumerator tick moves the cursor one row forward.
columns: argument is used to build the actual query for iteration. columns: defaults to primary key:
1) SELECT * FROM users ORDER BY id LIMIT 100
When iteration is resumed, cursor: and columns: values will be used to continue from the point where iteration stopped:
2) SELECT * FROM users WHERE id > $CURSOR ORDER BY id LIMIT 100
columns: can also take more than one column. In that case, cursor will contain serialized values of all columns at the point where iteration stopped.
Consider this example with columns: [:created_at, :id]. Here’s the query will use on the first iteration:
1) SELECT * FROM `products` ORDER BY created_at, id LIMIT 100
And the query on the next iteration:
2) SELECT * FROM `products`
WHERE (created_at > '$LAST_CREATED_AT_CURSOR'
OR (created_at = '$LAST_CREATED_AT_CURSOR' AND (id > '$LAST_ID_CURSOR')))
ORDER BY created_at, id LIMIT 100
As a result of this query pattern, if the values in these columns change for the records in scope during iteration, they may be skipped or yielded multiple times depending on the nature of the update and the cursor’s value. If the value gets updated to a greater value than the cursor’s value, it will get yielded again. Similarly, if the value gets updated to a lesser value than the curor’s value, it will get skipped.
115 116 117 118 119 120 121 122 |
# File 'lib/job-iteration/enumerator_builder.rb', line 115 def build_active_record_enumerator_on_records(scope, cursor:, **args) enum = build_active_record_enumerator( scope, cursor: cursor, **args, ).records wrap(self, enum) end |
#build_array_enumerator(enumerable, cursor:) ⇒ Object Also known as: array
Builds Enumerator object from a given array, using cursor as an offset.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/job-iteration/enumerator_builder.rb', line 54 def build_array_enumerator(enumerable, cursor:) unless enumerable.is_a?(Array) raise ArgumentError, "enumerable must be an Array" end drop = if cursor.nil? 0 else cursor + 1 end wrap(self, enumerable.each_with_index.drop(drop).to_enum { enumerable.size - drop }) end |
#build_csv_enumerator(enumerable, cursor:) ⇒ Object Also known as: csv
191 192 193 194 |
# File 'lib/job-iteration/enumerator_builder.rb', line 191 def build_csv_enumerator(enumerable, cursor:) enum = CsvEnumerator.new(enumerable).rows(cursor: cursor) wrap(self, enum) end |
#build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100) ⇒ Object Also known as: csv_on_batches
196 197 198 199 |
# File 'lib/job-iteration/enumerator_builder.rb', line 196 def build_csv_enumerator_on_batches(enumerable, cursor:, batch_size: 100) enum = CsvEnumerator.new(enumerable).batches(cursor: cursor, batch_size: batch_size) wrap(self, enum) end |
#build_nested_enumerator(enums, cursor:) ⇒ Object Also known as: nested
Builds Enumerator for nested iteration.
231 232 233 234 |
# File 'lib/job-iteration/enumerator_builder.rb', line 231 def build_nested_enumerator(enums, cursor:) enum = NestedEnumerator.new(enums, cursor: cursor).each wrap(self, enum) end |
#build_once_enumerator(cursor:) ⇒ Object Also known as: once
Builds Enumerator objects that iterates once.
42 43 44 |
# File 'lib/job-iteration/enumerator_builder.rb', line 42 def build_once_enumerator(cursor:) wrap(self, build_times_enumerator(1, cursor: cursor)) end |
#build_parallel_active_record_enumerator_on_batches(scope, instances:, cursor:, **args) ⇒ Object Also known as: parallel_active_record_on_batches
Builds an Enumerator that iterates over a given Active Record Relation, across instances parallel jobs, and enumerates on batches.
Child job i iterates over the batches of records where the id is equal to (instance % instances).
157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/job-iteration/enumerator_builder.rb', line 157 def build_parallel_active_record_enumerator_on_batches(scope, instances:, cursor:, **args) build_parallel_enumerator(instances: instances, cursor: cursor) do |instance, instances, inner_cursor| build_active_record_enumerator( scope, cursor: inner_cursor, instances: instances, instance: instance, **args, ).batches end end |
#build_parallel_active_record_enumerator_on_records(scope, instances:, cursor:, **args) ⇒ Object Also known as: parallel_active_record_on_records
Builds an Enumerator that iterates over a given Active Record Relation, across instances parallel jobs.
Child job i iterates over the records where the id is equal to (instance % instances).
127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/job-iteration/enumerator_builder.rb', line 127 def build_parallel_active_record_enumerator_on_records(scope, instances:, cursor:, **args) build_parallel_enumerator(instances: instances, cursor: cursor) do |instance, instances, inner_cursor| build_active_record_enumerator( scope, cursor: inner_cursor, instances: instances, instance: instance, **args, ).records end end |
#build_parallel_array_enumerator(array, instances:, cursor:) ⇒ Object Also known as: parallel_array
Builds an Enumerator that iterates over a given array, across instances parallel jobs.
Child job i iterates over the slice of the array starting at index (array.size / instances * i).floor and ending at index (array.size / instances * (i + 1)).floor - 1.
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/job-iteration/enumerator_builder.rb', line 73 def build_parallel_array_enumerator(array, instances:, cursor:) unless array.is_a?(Array) raise ArgumentError, "array must be an Array" end build_parallel_enumerator(instances: instances, cursor: cursor) do |instance, instances, inner_cursor| slice_start = (array.size.to_f / instances * instance).floor next_slice_start = (array.size.to_f / instances * (instance + 1)).floor slice = array[slice_start...next_slice_start] build_array_enumerator(slice, cursor: inner_cursor) end end |
#build_parallel_enumerator(instances:, cursor:, &block) ⇒ Object Also known as: parallel
236 237 238 239 240 241 242 243 244 245 |
# File 'lib/job-iteration/enumerator_builder.rb', line 236 def build_parallel_enumerator(instances:, cursor:, &block) unless instances.is_a?(Integer) && instances.positive? raise ArgumentError, "instances must be a positive Integer" end return ParallelEnumerator::EnqueueJobs.new(instances) if cursor.nil? enum = ParallelEnumerator.new(block, cursor: cursor).to_enum wrap(self, enum) end |
#build_throttle_enumerator(enumerable, throttle_on:, backoff:) ⇒ Object Also known as: throttle
181 182 183 184 185 186 187 188 189 |
# File 'lib/job-iteration/enumerator_builder.rb', line 181 def build_throttle_enumerator(enumerable, throttle_on:, backoff:) enum = JobIteration::ThrottleEnumerator.new( enumerable, @job, throttle_on: throttle_on, backoff: backoff, ).to_enum wrap(self, enum) end |
#build_times_enumerator(number, cursor:) ⇒ Object Also known as: times
Builds Enumerator objects that iterates N times and yields number starting from zero.
47 48 49 50 51 |
# File 'lib/job-iteration/enumerator_builder.rb', line 47 def build_times_enumerator(number, cursor:) raise ArgumentError, "First argument must be an Integer" unless number.is_a?(Integer) wrap(self, build_array_enumerator(number.times.to_a, cursor: cursor)) end |