Module: Philiprehberger::ParallelEach

Defined in:
lib/philiprehberger/parallel_each.rb,
lib/philiprehberger/parallel_each/version.rb,
lib/philiprehberger/parallel_each/worker_pool.rb

Overview

Parallel iteration with configurable thread pool and ordered results.

Defined Under Namespace

Classes: WorkerPool

Constant Summary collapse

VERSION =
'0.3.0'

Class Method Summary collapse

Class Method Details

.all?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean

Parallel all? with short-circuit behavior. Returns false as soon as any block invocation returns falsy.

Parameters:

  • collection (Enumerable)

    items to test

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Boolean)


195
196
197
198
199
# File 'lib/philiprehberger/parallel_each.rb', line 195

def self.all?(collection, concurrency: Etc.nprocessors, &block)
  return collection.all?(&block) if concurrency <= 1

  !any?(collection, concurrency: concurrency) { |item| !block.call(item) }
end

.any?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean

Parallel any? with short-circuit behavior. Returns true as soon as any block invocation returns truthy.

Parameters:

  • collection (Enumerable)

    items to test

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Boolean)


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/philiprehberger/parallel_each.rb', line 208

def self.any?(collection, concurrency: Etc.nprocessors, &block)
  return collection.any?(&block) if concurrency <= 1

  queue = Queue.new
  collection.each_with_index { |item, idx| queue << [idx, item] }
  concurrency.times { queue << :stop }

  found = false
  mutex = Mutex.new
  first_error = nil
  error_mutex = Mutex.new

  threads = Array.new([concurrency, 1].max) do
    Thread.new do
      loop do
        work = queue.pop
        break if work == :stop
        break if found

        _idx, item = work

        begin
          if block.call(item)
            mutex.synchronize { found = true }
            break
          end
        rescue StandardError => e
          error_mutex.synchronize { first_error ||= e }
          break
        end
      end
    end
  end

  threads.each(&:join)

  raise first_error if first_error

  found
end

.count(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Integer

Parallel count of matching elements.

Parameters:

  • collection (Enumerable)

    items to count

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy to count the item

Returns:

  • (Integer)


101
102
103
104
# File 'lib/philiprehberger/parallel_each.rb', line 101

def self.count(collection, concurrency: Etc.nprocessors, &block)
  results = map(collection, concurrency: concurrency, &block)
  results.count { |r| r }
end

.each(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Enumerable

Parallel each that processes all items.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block to execute for each item

Returns:

  • (Enumerable)

    the original collection



29
30
31
32
# File 'lib/philiprehberger/parallel_each.rb', line 29

def self.each(collection, concurrency: Etc.nprocessors, &block)
  map(collection, concurrency: concurrency, &block)
  collection
end

.each_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Enumerable

Parallel each with index. Passes (item, index) to the block.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item, index)

    block to execute for each item with its index

Returns:

  • (Enumerable)

    the original collection



79
80
81
82
83
# File 'lib/philiprehberger/parallel_each.rb', line 79

def self.each_with_index(collection, concurrency: Etc.nprocessors)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) }
  collection
end

.find(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Object?

Parallel find with short-circuit behavior. Returns the first element for which the block returns truthy, or nil.

Parameters:

  • collection (Enumerable)

    items to search

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Object, nil)

    the first matching element or nil



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/philiprehberger/parallel_each.rb', line 140

def self.find(collection, concurrency: Etc.nprocessors, &block)
  return collection.find(&block) if concurrency <= 1

  queue = Queue.new
  arr = collection.is_a?(Array) ? collection : collection.to_a
  arr.each_with_index { |item, idx| queue << [idx, item] }
  concurrency.times { queue << :stop }

  found_item = nil
  found_index = nil
  mutex = Mutex.new
  first_error = nil
  error_mutex = Mutex.new

  threads = Array.new([concurrency, 1].max) do
    Thread.new do
      loop do
        work = queue.pop
        break if work == :stop
        break if mutex.synchronize { !found_item.nil? }

        idx, item = work

        begin
          if block.call(item)
            mutex.synchronize do
              if found_item.nil? || idx < found_index
                found_item = item
                found_index = idx
              end
            end
            break
          end
        rescue StandardError => e
          error_mutex.synchronize { first_error ||= e }
          break
        end
      end
    end
  end

  threads.each(&:join)

  raise first_error if first_error

  found_item
end

.flat_map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel flat_map that preserves input order.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns an array (or single value) for each item

Returns:

  • (Array)

    flattened results in the same order as the input



55
56
57
58
59
60
# File 'lib/philiprehberger/parallel_each.rb', line 55

def self.flat_map(collection, concurrency: Etc.nprocessors, &block)
  return collection.flat_map(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  pool.run(collection, &block).flat_map { |r| Array(r.value) }
end

.map(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel map that preserves input order.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block to execute for each item

Returns:

  • (Array)

    results in the same order as the input



16
17
18
19
20
21
# File 'lib/philiprehberger/parallel_each.rb', line 16

def self.map(collection, concurrency: Etc.nprocessors, &block)
  return collection.map(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  pool.run(collection, &block).map(&:value)
end

.map_with_index(collection, concurrency: Etc.nprocessors) {|item, index| ... } ⇒ Array

Parallel map with index. Passes (item, index) to the block.

Parameters:

  • collection (Enumerable)

    items to process

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item, index)

    block to execute for each item with its index

Returns:

  • (Array)

    results in the same order as the input



68
69
70
71
# File 'lib/philiprehberger/parallel_each.rb', line 68

def self.map_with_index(collection, concurrency: Etc.nprocessors)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  map(arr.each_with_index.to_a, concurrency: concurrency) { |pair| yield(pair[0], pair[1]) }
end

.none?(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Boolean

Parallel none? — complement of any?.

Parameters:

  • collection (Enumerable)

    items to test

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy/falsy

Returns:

  • (Boolean)


91
92
93
# File 'lib/philiprehberger/parallel_each.rb', line 91

def self.none?(collection, concurrency: Etc.nprocessors, &block)
  !any?(collection, concurrency: concurrency, &block)
end

.reduce(collection, initial, concurrency: Etc.nprocessors) {|accumulator, item| ... } ⇒ Object

Sequential reduction over the collection.

Parameters:

  • collection (Enumerable)

    items to reduce

  • initial (Object)

    initial accumulator value

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    unused, accepted for API consistency

Yields:

  • (accumulator, item)

    block that returns the new accumulator

Returns:

  • (Object)

    final accumulator value



113
114
115
116
# File 'lib/philiprehberger/parallel_each.rb', line 113

def self.reduce(collection, initial, concurrency: Etc.nprocessors, &block)
  items = collection.is_a?(Array) ? collection : collection.to_a
  items.reduce(initial, &block)
end

.reject(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel reject (inverse of select) that preserves input order.

Parameters:

  • collection (Enumerable)

    items to filter

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy to reject the item

Returns:

  • (Array)

    items for which the block returned falsy, in original order



124
125
126
127
128
129
130
131
# File 'lib/philiprehberger/parallel_each.rb', line 124

def self.reject(collection, concurrency: Etc.nprocessors, &block)
  return collection.reject(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  results = pool.run(collection, &block)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  results.reject(&:value).map { |r| arr[r.index] }
end

.select(collection, concurrency: Etc.nprocessors) {|item| ... } ⇒ Array

Parallel select (filter) that preserves input order.

Parameters:

  • collection (Enumerable)

    items to filter

  • concurrency (Integer) (defaults to: Etc.nprocessors)

    number of threads (default: number of processors)

Yields:

  • (item)

    block that returns truthy to keep the item

Returns:

  • (Array)

    filtered items in the same order as the input



40
41
42
43
44
45
46
47
# File 'lib/philiprehberger/parallel_each.rb', line 40

def self.select(collection, concurrency: Etc.nprocessors, &block)
  return collection.select(&block) if concurrency <= 1

  pool = WorkerPool.new(concurrency: concurrency)
  results = pool.run(collection, &block)
  arr = collection.is_a?(Array) ? collection : collection.to_a
  results.select(&:value).map { |r| arr[r.index] }
end