philiprehberger-batch
Batch processing toolkit with chunking, progress, and error collection
Requirements
- Ruby >= 3.1
Installation
Add to your Gemfile:
gem "philiprehberger-batch"
Or install directly:
gem install philiprehberger-batch
Usage
require "philiprehberger/batch"
result = Philiprehberger::Batch.process(records, size: 50) do |batch|
batch.each { |record| save(record) }
end
puts result.processed # => number of successful items
puts result.success? # => true if no errors
Progress Tracking
result = Philiprehberger::Batch.process(items, size: 100) do |batch|
batch.each { |item| process(item) }
batch.on_progress do |info|
puts "Chunk #{info[:chunk_index] + 1}/#{info[:total_chunks]} - #{info[:percentage]}%"
end
end
Top-level Progress Callback
Pass on_progress: at the call site to subscribe without touching every chunk:
progress = ->(info) { puts "#{info[:percentage]}% (#{info[:processed]}/#{info[:total_items]})" }
Philiprehberger::Batch.process(items, size: 100, on_progress: progress) do |batch|
batch.each { |item| process(item) }
end
Error Collection
result = Philiprehberger::Batch.process(jobs, size: 25) do |batch|
batch.each { |job| job.execute! }
batch.on_error { |item, err| log_error(item, err) }
end
result.errors.each do |entry|
puts "Failed: #{entry[:item]} - #{entry[:error].}"
end
Early Termination
result = Philiprehberger::Batch.process(items, size: 50) do |batch|
batch.on_error { |_item, _err| :halt }
batch.each { |item| risky_operation(item) }
end
result.halted? # => true if processing stopped early
Retry Per Chunk
result = Philiprehberger::Batch.process(items, size: 100, retries: 2) do |batch|
batch.each { |item| unreliable_api_call(item) }
end
Result Aggregation
result = Philiprehberger::Batch.process(users, size: 50) do |batch|
batch.each { |user| user.active? ? :active : :inactive }
end
result.counts # => { active: 42, inactive: 8 }
result.flat_map { |status| [status] } # => [:active, :active, :inactive, ...]
result.group_by { |status| status } # => { active: [...], inactive: [...] }
Success Rate
result = Philiprehberger::Batch.process(jobs, size: 50) do |batch|
batch.each { |job| job.execute! }
end
result.success_rate # => 0.0..1.0 ratio of processed to total (1.0 when total is 0)
puts "#{(result.success_rate * 100).round(1)}% succeeded"
Timing Statistics
result = Philiprehberger::Batch.process(records, size: 50) do |batch|
batch.each { |record| save(record) }
end
stats = result.timing
puts stats[:total] # => overall elapsed time in seconds
puts stats[:per_chunk] # => average time per chunk
puts stats[:per_item] # => average time per item
puts stats[:fastest_chunk] # => shortest chunk duration
puts stats[:slowest_chunk] # => longest chunk duration
Concurrency
result = Philiprehberger::Batch.process(records, size: 100, concurrency: 4) do |batch|
batch.each { |record| api_call(record) }
end
result.processed # => total successful across all threads
result.results # => collected in chunk order
API
| Method / Class | Description |
|---|---|
| `.process(collection, size:, concurrency:, retries:, on_progress:) { \ | batch\ |
| `Chunk#each { \ | item\ |
| `Chunk#on_progress { \ | info\ |
| `Chunk#on_error { \ | item, err\ |
Result#processed |
Number of successfully processed items |
Result#errors |
Array of error hashes |
Result#total |
Total number of items |
Result#chunks |
Number of chunks processed |
Result#elapsed |
Elapsed time in seconds |
Result#success? |
True if no errors occurred |
Result#halted? |
True if processing was halted early |
Result#results |
Array of collected return values |
| `Result#flat_map { \ | r\ |
Result#counts |
Hash counting occurrences of each result value |
| `Result#group_by { \ | r\ |
Result#success_rate |
Ratio of processed to total as a Float in [0.0, 1.0] (1.0 when empty) |
Result#timing |
Hash of timing stats: total, per_chunk, per_item, fastest_chunk, slowest_chunk |
Development
bundle install
bundle exec rspec
bundle exec rubocop
Support
If you find this project useful: