philiprehberger-parallel_each
Parallel iteration with configurable thread pool and ordered results
Requirements
- Ruby >= 3.1
Installation
Add to your Gemfile:
gem "philiprehberger-parallel_each"
Or install directly:
gem install philiprehberger-parallel_each
Usage
require "philiprehberger/parallel_each"
# Parallel map (results preserve input order)
results = Philiprehberger::ParallelEach.map(urls, concurrency: 8) do |url|
fetch(url)
end
Parallel Each
Philiprehberger::ParallelEach.each(items, concurrency: 4) do |item|
process(item)
end
Parallel Select and Reject
even = Philiprehberger::ParallelEach.select(numbers, concurrency: 4, &:even?)
odd = Philiprehberger::ParallelEach.reject(numbers, concurrency: 4, &:even?)
Parallel Find
admin = Philiprehberger::ParallelEach.find(users, concurrency: 4, &:admin?)
Parallel Flat Map
pairs = Philiprehberger::ParallelEach.flat_map(records, concurrency: 4) do |r|
[r.id, r.name]
end
Map and Each with Index
# map_with_index passes (item, index) to the block
labeled = Philiprehberger::ParallelEach.map_with_index(items, concurrency: 4) do |item, idx|
"#{idx}: #{item}"
end
# each_with_index for side effects with index access
Philiprehberger::ParallelEach.each_with_index(items, concurrency: 4) do |item, idx|
puts "Processing item #{idx}: #{item}"
end
Short-Circuit Methods
has_admin = Philiprehberger::ParallelEach.any?(users, concurrency: 4, &:admin?)
all_valid = Philiprehberger::ParallelEach.all?(users, concurrency: 4, &:valid?)
no_errors = Philiprehberger::ParallelEach.none?(records, concurrency: 4, &:invalid?)
Count and Reduce
even_count = Philiprehberger::ParallelEach.count(numbers, concurrency: 4, &:even?)
total = Philiprehberger::ParallelEach.reduce([1, 2, 3, 4], 0) { |acc, item| acc + item }
Concurrency
All methods accept a concurrency: keyword argument that controls the thread pool size. It defaults to Etc.nprocessors (the number of available CPU cores).
# Use 2 threads
Philiprehberger::ParallelEach.map(items, concurrency: 2) { |i| i * 2 }
# Use all available cores (default)
Philiprehberger::ParallelEach.map(items) { |i| i * 2 }
Error Handling
If any block raises an exception, the first error is re-raised after all threads finish:
begin
Philiprehberger::ParallelEach.map(items, concurrency: 4) do |item|
raise ArgumentError, 'invalid' if item.nil?
transform(item)
end
rescue ArgumentError => e
puts e. # => "invalid"
end
API
| Method | Description |
|---|---|
| `ParallelEach.map(collection, concurrency:) { \ | item\ |
| `ParallelEach.each(collection, concurrency:) { \ | item\ |
| `ParallelEach.select(collection, concurrency:) { \ | item\ |
| `ParallelEach.reject(collection, concurrency:) { \ | item\ |
| `ParallelEach.flat_map(collection, concurrency:) { \ | item\ |
| `ParallelEach.find(collection, concurrency:) { \ | item\ |
| `ParallelEach.any?(collection, concurrency:) { \ | item\ |
| `ParallelEach.all?(collection, concurrency:) { \ | item\ |
| `ParallelEach.none?(collection, concurrency:) { \ | item\ |
| `ParallelEach.map_with_index(collection, concurrency:) { \ | item, idx\ |
| `ParallelEach.each_with_index(collection, concurrency:) { \ | item, idx\ |
| `ParallelEach.count(collection, concurrency:) { \ | item\ |
| `ParallelEach.reduce(collection, initial, concurrency:) { \ | acc, item\ |
Development
bundle install
bundle exec rspec
bundle exec rubocop
Support
If you find this project useful: