philiprehberger-parallel_each

Tests Gem Version Last updated

Parallel iteration with configurable thread pool and ordered results

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-parallel_each"

Or install directly:

gem install philiprehberger-parallel_each

Usage

require "philiprehberger/parallel_each"

# Parallel map (results preserve input order)
results = Philiprehberger::ParallelEach.map(urls, concurrency: 8) do |url|
  fetch(url)
end

Parallel Each

Philiprehberger::ParallelEach.each(items, concurrency: 4) do |item|
  process(item)
end

Parallel Select and Reject

even = Philiprehberger::ParallelEach.select(numbers, concurrency: 4, &:even?)
odd = Philiprehberger::ParallelEach.reject(numbers, concurrency: 4, &:even?)

Parallel Find

admin = Philiprehberger::ParallelEach.find(users, concurrency: 4, &:admin?)

Parallel Flat Map

pairs = Philiprehberger::ParallelEach.flat_map(records, concurrency: 4) do |r|
  [r.id, r.name]
end

Map and Each with Index

# map_with_index passes (item, index) to the block
labeled = Philiprehberger::ParallelEach.map_with_index(items, concurrency: 4) do |item, idx|
  "#{idx}: #{item}"
end

# each_with_index for side effects with index access
Philiprehberger::ParallelEach.each_with_index(items, concurrency: 4) do |item, idx|
  puts "Processing item #{idx}: #{item}"
end

Short-Circuit Methods

has_admin = Philiprehberger::ParallelEach.any?(users, concurrency: 4, &:admin?)
all_valid = Philiprehberger::ParallelEach.all?(users, concurrency: 4, &:valid?)
no_errors = Philiprehberger::ParallelEach.none?(records, concurrency: 4, &:invalid?)

Count and Reduce

even_count = Philiprehberger::ParallelEach.count(numbers, concurrency: 4, &:even?)

total = Philiprehberger::ParallelEach.reduce([1, 2, 3, 4], 0) { |acc, item| acc + item }

Concurrency

All methods accept a concurrency: keyword argument that controls the thread pool size. It defaults to Etc.nprocessors (the number of available CPU cores).

# Use 2 threads
Philiprehberger::ParallelEach.map(items, concurrency: 2) { |i| i * 2 }

# Use all available cores (default)
Philiprehberger::ParallelEach.map(items) { |i| i * 2 }

Error Handling

If any block raises an exception, the first error is re-raised after all threads finish:

begin
  Philiprehberger::ParallelEach.map(items, concurrency: 4) do |item|
    raise ArgumentError, 'invalid' if item.nil?

    transform(item)
  end
rescue ArgumentError => e
  puts e.message # => "invalid"
end

API

Method Description
`ParallelEach.map(collection, concurrency:) { \ item\
`ParallelEach.each(collection, concurrency:) { \ item\
`ParallelEach.select(collection, concurrency:) { \ item\
`ParallelEach.reject(collection, concurrency:) { \ item\
`ParallelEach.flat_map(collection, concurrency:) { \ item\
`ParallelEach.find(collection, concurrency:) { \ item\
`ParallelEach.any?(collection, concurrency:) { \ item\
`ParallelEach.all?(collection, concurrency:) { \ item\
`ParallelEach.none?(collection, concurrency:) { \ item\
`ParallelEach.map_with_index(collection, concurrency:) { \ item, idx\
`ParallelEach.each_with_index(collection, concurrency:) { \ item, idx\
`ParallelEach.count(collection, concurrency:) { \ item\
`ParallelEach.reduce(collection, initial, concurrency:) { \ acc, item\

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT