Module: Philiprehberger::ParallelEach
- Defined in:
- lib/philiprehberger/parallel_each.rb,
lib/philiprehberger/parallel_each/version.rb,
lib/philiprehberger/parallel_each/worker_pool.rb
Overview
Parallel iteration with configurable thread pool and ordered results.
Defined Under Namespace
Classes: WorkerPool
Constant Summary collapse
- VERSION =
'0.3.0'
Class Method Summary collapse
-
.all?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel all? with short-circuit behavior.
-
.any?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel any? with short-circuit behavior.
-
.count(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Integer
Parallel count of matching elements.
-
.each(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Enumerable
Parallel each that processes all items.
-
.each_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Enumerable
Parallel each with index.
-
.find(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Object?
Parallel find with short-circuit behavior.
-
.flat_map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel flat_map that preserves input order.
-
.map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel map that preserves input order.
-
.map_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Array
Parallel map with index.
-
.none?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel none? — complement of any?.
-
.reduce(collection, initial, concurrency: Etc.nprocessors) {|accumulator, item| ... } ⇒ Object
Sequential reduction over the collection.
-
.reject(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel reject (inverse of select) that preserves input order.
-
.select(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel select (filter) that preserves input order.
Class Method Details
.all?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel all? with short-circuit behavior. Returns false as soon as any block invocation returns falsy.
195 196 197 198 199 |
# File 'lib/philiprehberger/parallel_each.rb', line 195 def self.all?(collection, concurrency: Etc.nprocessors, &block) return collection.all?(&block) if concurrency <= 1 !any?(collection, concurrency: concurrency) { |item| !block.call(item) } end |
.any?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel any? with short-circuit behavior. Returns true as soon as any block invocation returns truthy.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/philiprehberger/parallel_each.rb', line 208 def self.any?(collection, concurrency: Etc.nprocessors, &block) return collection.any?(&block) if concurrency <= 1 queue = Queue.new collection.each_with_index { |item, idx| queue << [idx, item] } concurrency.times { queue << :stop } found = false mutex = Mutex.new first_error = nil error_mutex = Mutex.new threads = Array.new([concurrency, 1].max) do Thread.new do loop do work = queue.pop break if work == :stop break if found _idx, item = work begin if block.call(item) mutex.synchronize { found = true } break end rescue StandardError => e error_mutex.synchronize { first_error ||= e } break end end end end threads.each(&:join) raise first_error if first_error found end |
.count(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Integer
Parallel count of matching elements.
101 102 103 104 |
# File 'lib/philiprehberger/parallel_each.rb', line 101 def self.count(collection, concurrency: Etc.nprocessors, &block) results = map(collection, concurrency: concurrency, &block) results.count { |r| r } end |
.each(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Enumerable
Parallel each that processes all items.
29 30 31 32 |
# File 'lib/philiprehberger/parallel_each.rb', line 29 def self.each(collection, concurrency: Etc.nprocessors, &block) map(collection, concurrency: concurrency, &block) collection end |
.each_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Enumerable
Parallel each with index. Passes (item, index) to the block.
79 80 81 82 83 |
# File 'lib/philiprehberger/parallel_each.rb', line 79 def self.each_with_index(collection, concurrency: Etc.nprocessors) arr = collection.is_a?(Array) ? collection : collection.to_a map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) } collection end |
.find(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Object?
Parallel find with short-circuit behavior. Returns the first element for which the block returns truthy, or nil.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/philiprehberger/parallel_each.rb', line 140 def self.find(collection, concurrency: Etc.nprocessors, &block) return collection.find(&block) if concurrency <= 1 queue = Queue.new arr = collection.is_a?(Array) ? collection : collection.to_a arr.each_with_index { |item, idx| queue << [idx, item] } concurrency.times { queue << :stop } found_item = nil found_index = nil mutex = Mutex.new first_error = nil error_mutex = Mutex.new threads = Array.new([concurrency, 1].max) do Thread.new do loop do work = queue.pop break if work == :stop break if mutex.synchronize { !found_item.nil? } idx, item = work begin if block.call(item) mutex.synchronize do if found_item.nil? || idx < found_index found_item = item found_index = idx end end break end rescue StandardError => e error_mutex.synchronize { first_error ||= e } break end end end end threads.each(&:join) raise first_error if first_error found_item end |
.flat_map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel flat_map that preserves input order.
55 56 57 58 59 60 |
# File 'lib/philiprehberger/parallel_each.rb', line 55 def self.flat_map(collection, concurrency: Etc.nprocessors, &block) return collection.flat_map(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) pool.run(collection, &block).flat_map { |r| Array(r.value) } end |
.map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel map that preserves input order.
16 17 18 19 20 21 |
# File 'lib/philiprehberger/parallel_each.rb', line 16 def self.map(collection, concurrency: Etc.nprocessors, &block) return collection.map(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) pool.run(collection, &block).map(&:value) end |
.map_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Array
Parallel map with index. Passes (item, index) to the block.
68 69 70 71 |
# File 'lib/philiprehberger/parallel_each.rb', line 68 def self.map_with_index(collection, concurrency: Etc.nprocessors) arr = collection.is_a?(Array) ? collection : collection.to_a map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) } end |
.none?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel none? — complement of any?.
91 92 93 |
# File 'lib/philiprehberger/parallel_each.rb', line 91 def self.none?(collection, concurrency: Etc.nprocessors, &block) !any?(collection, concurrency: concurrency, &block) end |
.reduce(collection, initial, concurrency: Etc.nprocessors) {|accumulator, item| ... } ⇒ Object
Sequential reduction over the collection.
113 114 115 116 |
# File 'lib/philiprehberger/parallel_each.rb', line 113 def self.reduce(collection, initial, concurrency: Etc.nprocessors, &block) items = collection.is_a?(Array) ? collection : collection.to_a items.reduce(initial, &block) end |
.reject(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel reject (inverse of select) that preserves input order.
124 125 126 127 128 129 130 131 |
# File 'lib/philiprehberger/parallel_each.rb', line 124 def self.reject(collection, concurrency: Etc.nprocessors, &block) return collection.reject(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) results = pool.run(collection, &block) arr = collection.is_a?(Array) ? collection : collection.to_a results.reject(&:value).map { |r| arr[r.index] } end |
.select(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel select (filter) that preserves input order.
40 41 42 43 44 45 46 47 |
# File 'lib/philiprehberger/parallel_each.rb', line 40 def self.select(collection, concurrency: Etc.nprocessors, &block) return collection.select(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) results = pool.run(collection, &block) arr = collection.is_a?(Array) ? collection : collection.to_a results.select(&:value).map { |r| arr[r.index] } end |