Module: Philiprehberger::ParallelEach
- Defined in:
- lib/philiprehberger/parallel_each.rb,
lib/philiprehberger/parallel_each/version.rb,
lib/philiprehberger/parallel_each/worker_pool.rb
Overview
Parallel iteration with configurable thread pool and ordered results.
Defined Under Namespace
Classes: WorkerPool
Constant Summary collapse
- VERSION =
'0.5.0'
Class Method Summary collapse
-
.all?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel all? with short-circuit behavior.
-
.any?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel any? with short-circuit behavior.
-
.count(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Integer
Parallel count of matching elements.
-
.each(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Enumerable
Parallel each that processes all items.
-
.each_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Enumerable
Parallel each with index.
-
.find(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Object?
Parallel find with short-circuit behavior.
-
.flat_map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel flat_map that preserves input order.
-
.last_stats ⇒ Hash?
Snapshot of stats from the most recent parallel run, or nil if no run yet.
-
.map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel map that preserves input order.
-
.map_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Array
Parallel map with index.
-
.none?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel none? — complement of any?.
-
.partition(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array<Array>
Parallel partition.
-
.reduce(collection, initial, concurrency: Etc.nprocessors) {|accumulator, item| ... } ⇒ Object
Sequential reduction over the collection.
-
.reject(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel reject (inverse of select) that preserves input order.
-
.select(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel select (filter) that preserves input order.
Class Method Details
.all?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel all? with short-circuit behavior. Returns false as soon as any block invocation returns falsy.
229 230 231 232 233 |
# File 'lib/philiprehberger/parallel_each.rb', line 229 def self.all?(collection, concurrency: Etc.nprocessors, &block) return collection.all?(&block) if concurrency <= 1 !any?(collection, concurrency: concurrency) { |item| !block.call(item) } end |
.any?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel any? with short-circuit behavior. Returns true as soon as any block invocation returns truthy.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/philiprehberger/parallel_each.rb', line 242 def self.any?(collection, concurrency: Etc.nprocessors, &block) return collection.any?(&block) if concurrency <= 1 queue = Queue.new collection.each_with_index { |item, idx| queue << [idx, item] } concurrency.times { queue << :stop } found = false mutex = Mutex.new first_error = nil error_mutex = Mutex.new threads = Array.new([concurrency, 1].max) do Thread.new do loop do work = queue.pop break if work == :stop break if found _idx, item = work begin if block.call(item) mutex.synchronize { found = true } break end rescue StandardError => e error_mutex.synchronize { first_error ||= e } break end end end end threads.each(&:join) raise first_error if first_error found end |
.count(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Integer
Parallel count of matching elements.
117 118 119 120 |
# File 'lib/philiprehberger/parallel_each.rb', line 117 def self.count(collection, concurrency: Etc.nprocessors, &block) results = map(collection, concurrency: concurrency, &block) results.count { |r| r } end |
.each(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Enumerable
Parallel each that processes all items.
45 46 47 48 |
# File 'lib/philiprehberger/parallel_each.rb', line 45 def self.each(collection, concurrency: Etc.nprocessors, &block) map(collection, concurrency: concurrency, &block) collection end |
.each_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Enumerable
Parallel each with index. Passes (item, index) to the block.
95 96 97 98 99 |
# File 'lib/philiprehberger/parallel_each.rb', line 95 def self.each_with_index(collection, concurrency: Etc.nprocessors) arr = collection.is_a?(Array) ? collection : collection.to_a map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) } collection end |
.find(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Object?
Parallel find with short-circuit behavior. Returns the first element for which the block returns truthy, or nil.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/philiprehberger/parallel_each.rb', line 174 def self.find(collection, concurrency: Etc.nprocessors, &block) return collection.find(&block) if concurrency <= 1 queue = Queue.new arr = collection.is_a?(Array) ? collection : collection.to_a arr.each_with_index { |item, idx| queue << [idx, item] } concurrency.times { queue << :stop } found_item = nil found_index = nil mutex = Mutex.new first_error = nil error_mutex = Mutex.new threads = Array.new([concurrency, 1].max) do Thread.new do loop do work = queue.pop break if work == :stop break if mutex.synchronize { !found_item.nil? } idx, item = work begin if block.call(item) mutex.synchronize do if found_item.nil? || idx < found_index found_item = item found_index = idx end end break end rescue StandardError => e error_mutex.synchronize { first_error ||= e } break end end end end threads.each(&:join) raise first_error if first_error found_item end |
.flat_map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel flat_map that preserves input order.
71 72 73 74 75 76 |
# File 'lib/philiprehberger/parallel_each.rb', line 71 def self.flat_map(collection, concurrency: Etc.nprocessors, &block) return collection.flat_map(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) pool.run(collection, &block).flat_map { |r| Array(r.value) } end |
.last_stats ⇒ Hash?
Snapshot of stats from the most recent parallel run, or nil if no run yet.
16 17 18 |
# File 'lib/philiprehberger/parallel_each.rb', line 16 def self.last_stats @last_stats_mutex.synchronize { @last_stats } end |
.map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel map that preserves input order.
32 33 34 35 36 37 |
# File 'lib/philiprehberger/parallel_each.rb', line 32 def self.map(collection, concurrency: Etc.nprocessors, &block) return collection.map(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) pool.run(collection, &block).map(&:value) end |
.map_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Array
Parallel map with index. Passes (item, index) to the block.
84 85 86 87 |
# File 'lib/philiprehberger/parallel_each.rb', line 84 def self.map_with_index(collection, concurrency: Etc.nprocessors) arr = collection.is_a?(Array) ? collection : collection.to_a map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) } end |
.none?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean
Parallel none? — complement of any?.
107 108 109 |
# File 'lib/philiprehberger/parallel_each.rb', line 107 def self.none?(collection, concurrency: Etc.nprocessors, &block) !any?(collection, concurrency: concurrency, &block) end |
.partition(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array<Array>
Parallel partition. Evaluates the block on every element and returns ‘[truthy_items, falsy_items]`, preserving input order within each array.
156 157 158 159 160 161 162 163 164 165 |
# File 'lib/philiprehberger/parallel_each.rb', line 156 def self.partition(collection, concurrency: Etc.nprocessors, &block) return collection.partition(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) results = pool.run(collection, &block) arr = collection.is_a?(Array) ? collection : collection.to_a truthy = results.select(&:value).map { |r| arr[r.index] } falsy = results.reject(&:value).map { |r| arr[r.index] } [truthy, falsy] end |
.reduce(collection, initial, concurrency: Etc.nprocessors) {|accumulator, item| ... } ⇒ Object
Sequential reduction over the collection.
129 130 131 132 |
# File 'lib/philiprehberger/parallel_each.rb', line 129 def self.reduce(collection, initial, concurrency: Etc.nprocessors, &block) items = collection.is_a?(Array) ? collection : collection.to_a items.reduce(initial, &block) end |
.reject(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel reject (inverse of select) that preserves input order.
140 141 142 143 144 145 146 147 |
# File 'lib/philiprehberger/parallel_each.rb', line 140 def self.reject(collection, concurrency: Etc.nprocessors, &block) return collection.reject(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) results = pool.run(collection, &block) arr = collection.is_a?(Array) ? collection : collection.to_a results.reject(&:value).map { |r| arr[r.index] } end |
.select(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array
Parallel select (filter) that preserves input order.
56 57 58 59 60 61 62 63 |
# File 'lib/philiprehberger/parallel_each.rb', line 56 def self.select(collection, concurrency: Etc.nprocessors, &block) return collection.select(&block) if concurrency <= 1 pool = WorkerPool.new(concurrency: concurrency) results = pool.run(collection, &block) arr = collection.is_a?(Array) ? collection : collection.to_a results.select(&:value).map { |r| arr[r.index] } end |