Module: Philiprehberger::ParallelEach

Defined in:
lib/philiprehberger/parallel_each.rb,
lib/philiprehberger/parallel_each/version.rb,
lib/philiprehberger/parallel_each/worker_pool.rb

Overview

Parallel iteration with configurable thread pool and ordered results.

Defined Under Namespace

Classes: WorkerPool

Constant Summary collapse

VERSION =
'0.5.0'

Class Method Summary collapse

Class Method Details

.all?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean

Parallel all? with short-circuit behavior. Returns false as soon as any block invocation returns falsy.

Parameters:

  • collection (Enumerable)

    items to test

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Boolean)


229
230
231
232
233
# File 'lib/philiprehberger/parallel_each.rb', line 229

def self.all?(collection, concurrency: Etc.nprocessors, &block)
  return collection.all?(&block) if concurrency <= 1

  !any?(collection, concurrency: concurrency) { |item| !block.call(item) }
end

.any?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean

Parallel any? with short-circuit behavior. Returns true as soon as any block invocation returns truthy.

Parameters:

  • collection (Enumerable)

    items to test

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Boolean)


242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/philiprehberger/parallel_each.rb', line 242

def self.any?(collection, concurrency: Etc.nprocessors, &block)
  return collection.any?(&block) if concurrency <= 1

  queue = Queue.new
  collection.each_with_index { |item, idx| queue << [idx, item] }
  concurrency.times { queue << :stop }

  found = false
  mutex = Mutex.new
  first_error = nil
  error_mutex = Mutex.new

  threads = Array.new([concurrency, 1].max) do
    Thread.new do
      loop do
        work = queue.pop
        break if work == :stop
        break if found

        _idx, item = work

        begin
          if block.call(item)
            mutex.synchronize { found = true }
            break
          end
        rescue StandardError => e
          error_mutex.synchronize { first_error ||= e }
          break
        end
      end
    end
  end

  threads.each(&:join)

  raise first_error if first_error

  found
end

.count(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Integer

Parallel count of matching elements.

Parameters:

  • collection (Enumerable)

    items to count

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy to count the item

Returns:

  • (Integer)


117
118
119
120
# File 'lib/philiprehberger/parallel_each.rb', line 117

def self.count(collection, concurrency: Etc.nprocessors, &block)
  results = map(collection, concurrency: concurrency, &block)
  results.count { |r| r }
end

.each(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Enumerable

Parallel each that processes all items.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block to execute for each item

Returns:

  • (Enumerable)

    the original collection



45
46
47
48
# File 'lib/philiprehberger/parallel_each.rb', line 45

def self.each(collection, concurrency: Etc.nprocessors, &block)
  map(collection, concurrency: concurrency, &block)
  collection
end

.each_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Enumerable

Parallel each with index. Passes (item, index) to the block.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item, index)

    block to execute for each item with its index

Returns:

  • (Enumerable)

    the original collection



95
96
97
98
99
# File 'lib/philiprehberger/parallel_each.rb', line 95

def self.each_with_index(collection, concurrency: Etc.nprocessors)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) }
  collection
end

.find(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Object?

Parallel find with short-circuit behavior. Returns the first element for which the block returns truthy, or nil.

Parameters:

  • collection (Enumerable)

    items to search

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Object, nil)

    the first matching element or nil



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/philiprehberger/parallel_each.rb', line 174

def self.find(collection, concurrency: Etc.nprocessors, &block)
  return collection.find(&block) if concurrency <= 1

  queue = Queue.new
  arr = collection.is_a?(Array) ? collection : collection.to_a
  arr.each_with_index { |item, idx| queue << [idx, item] }
  concurrency.times { queue << :stop }

  found_item = nil
  found_index = nil
  mutex = Mutex.new
  first_error = nil
  error_mutex = Mutex.new

  threads = Array.new([concurrency, 1].max) do
    Thread.new do
      loop do
        work = queue.pop
        break if work == :stop
        break if mutex.synchronize { !found_item.nil? }

        idx, item = work

        begin
          if block.call(item)
            mutex.synchronize do
              if found_item.nil? || idx < found_index
                found_item = item
                found_index = idx
              end
            end
            break
          end
        rescue StandardError => e
          error_mutex.synchronize { first_error ||= e }
          break
        end
      end
    end
  end

  threads.each(&:join)

  raise first_error if first_error

  found_item
end

.flat_map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel flat_map that preserves input order.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns an array (or single value) for each item

Returns:

  • (Array)

    flattened results in the same order as the input



71
72
73
74
75
76
# File 'lib/philiprehberger/parallel_each.rb', line 71

def self.flat_map(collection, concurrency: Etc.nprocessors, &block)
  return collection.flat_map(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  pool.run(collection, &block).flat_map { |r| Array(r.value) }
end

.last_statsHash?

Snapshot of stats from the most recent parallel run, or nil if no run yet.

Returns:

  • (Hash, nil)

    hash with keys :workers, :completed, :failed, :elapsed_seconds



16
17
18
# File 'lib/philiprehberger/parallel_each.rb', line 16

def self.last_stats
  @last_stats_mutex.synchronize { @last_stats }
end

.map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel map that preserves input order.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block to execute for each item

Returns:

  • (Array)

    results in the same order as the input



32
33
34
35
36
37
# File 'lib/philiprehberger/parallel_each.rb', line 32

def self.map(collection, concurrency: Etc.nprocessors, &block)
  return collection.map(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  pool.run(collection, &block).map(&:value)
end

.map_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Array

Parallel map with index. Passes (item, index) to the block.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item, index)

    block to execute for each item with its index

Returns:

  • (Array)

    results in the same order as the input



84
85
86
87
# File 'lib/philiprehberger/parallel_each.rb', line 84

def self.map_with_index(collection, concurrency: Etc.nprocessors)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) }
end

.none?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean

Parallel none? — complement of any?.

Parameters:

  • collection (Enumerable)

    items to test

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Boolean)


107
108
109
# File 'lib/philiprehberger/parallel_each.rb', line 107

def self.none?(collection, concurrency: Etc.nprocessors, &block)
  !any?(collection, concurrency: concurrency, &block)
end

.partition(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array<Array>

Parallel partition. Evaluates the block on every element and returns ‘[truthy_items, falsy_items]`, preserving input order within each array.

Parameters:

  • collection (Enumerable)

    items to partition

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Array<Array>)

    a two-element array ‘[truthy, falsy]`



156
157
158
159
160
161
162
163
164
165
# File 'lib/philiprehberger/parallel_each.rb', line 156

def self.partition(collection, concurrency: Etc.nprocessors, &block)
  return collection.partition(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  results = pool.run(collection, &block)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  truthy = results.select(&:value).map { |r| arr[r.index] }
  falsy  = results.reject(&:value).map { |r| arr[r.index] }
  [truthy, falsy]
end

.reduce(collection, initial, concurrency: Etc.nprocessors) {|accumulator, item| ... } ⇒ Object

Sequential reduction over the collection.

Parameters:

  • collection (Enumerable)

    items to reduce

  • initial (Object)

    initial accumulator value

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    unused, accepted for API consistency

Yields:

  • (accumulator, item)

    block that returns the new accumulator

Returns:

  • (Object)

    final accumulator value



129
130
131
132
# File 'lib/philiprehberger/parallel_each.rb', line 129

def self.reduce(collection, initial, concurrency: Etc.nprocessors, &block)
  items = collection.is_a?(Array) ? collection : collection.to_a
  items.reduce(initial, &block)
end

.reject(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel reject (inverse of select) that preserves input order.

Parameters:

  • collection (Enumerable)

    items to filter

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy to reject the item

Returns:

  • (Array)

    items for which the block returned falsy, in original order



140
141
142
143
144
145
146
147
# File 'lib/philiprehberger/parallel_each.rb', line 140

def self.reject(collection, concurrency: Etc.nprocessors, &block)
  return collection.reject(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  results = pool.run(collection, &block)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  results.reject(&:value).map { |r| arr[r.index] }
end

.select(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel select (filter) that preserves input order.

Parameters:

  • collection (Enumerable)

    items to filter

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy to keep the item

Returns:

  • (Array)

    filtered items in the same order as the input



56
57
58
59
60
61
62
63
# File 'lib/philiprehberger/parallel_each.rb', line 56

def self.select(collection, concurrency: Etc.nprocessors, &block)
  return collection.select(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  results = pool.run(collection, &block)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  results.select(&:value).map { |r| arr[r.index] }
end