philiprehberger-batch

Tests Gem Version Last updated

Batch processing toolkit with chunking, progress, and error collection

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-batch"

Or install directly:

gem install philiprehberger-batch

Usage

require "philiprehberger/batch"

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| save(record) }
end

puts result.processed  # => number of successful items
puts result.success?   # => true if no errors

Progress Tracking

result = Philiprehberger::Batch.process(items, size: 100) do |batch|
  batch.each { |item| process(item) }
  batch.on_progress do |info|
    puts "Chunk #{info[:chunk_index] + 1}/#{info[:total_chunks]} - #{info[:percentage]}%"
  end
end

Top-level Progress Callback

Pass on_progress: at the call site to subscribe without touching every chunk:

progress = ->(info) { puts "#{info[:percentage]}% (#{info[:processed]}/#{info[:total_items]})" }

Philiprehberger::Batch.process(items, size: 100, on_progress: progress) do |batch|
  batch.each { |item| process(item) }
end

Error Collection

result = Philiprehberger::Batch.process(jobs, size: 25) do |batch|
  batch.each { |job| job.execute! }
  batch.on_error { |item, err| log_error(item, err) }
end

result.errors.each do |entry|
  puts "Failed: #{entry[:item]} - #{entry[:error].message}"
end

Early Termination

result = Philiprehberger::Batch.process(items, size: 50) do |batch|
  batch.on_error { |_item, _err| :halt }
  batch.each { |item| risky_operation(item) }
end

result.halted?  # => true if processing stopped early

Retry Per Chunk

result = Philiprehberger::Batch.process(items, size: 100, retries: 2) do |batch|
  batch.each { |item| unreliable_api_call(item) }
end

Result Aggregation

result = Philiprehberger::Batch.process(users, size: 50) do |batch|
  batch.each { |user| user.active? ? :active : :inactive }
end

result.counts                              # => { active: 42, inactive: 8 }
result.flat_map { |status| [status] }      # => [:active, :active, :inactive, ...]
result.group_by { |status| status }        # => { active: [...], inactive: [...] }

Concurrency

result = Philiprehberger::Batch.process(records, size: 100, concurrency: 4) do |batch|
  batch.each { |record| api_call(record) }
end

result.processed  # => total successful across all threads
result.results    # => collected in chunk order

API

Method / Class Description
`.process(collection, size:, concurrency:, retries:, on_progress:) { \ batch\
`Chunk#each { \ item\
`Chunk#on_progress { \ info\
`Chunk#on_error { \ item, err\
Result#processed Number of successfully processed items
Result#errors Array of error hashes
Result#total Total number of items
Result#chunks Number of chunks processed
Result#elapsed Elapsed time in seconds
Result#success? True if no errors occurred
Result#halted? True if processing was halted early
Result#results Array of collected return values
`Result#flat_map { \ r\
Result#counts Hash counting occurrences of each result value
`Result#group_by { \ r\

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT