philiprehberger-batch

Tests Gem Version Last updated

Batch processing toolkit with chunking, progress, and error collection

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-batch"

Or install directly:

gem install philiprehberger-batch

Usage

require "philiprehberger/batch"

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| save(record) }
end

puts result.processed  # => number of successful items
puts result.success?   # => true if no errors

Progress Tracking

result = Philiprehberger::Batch.process(items, size: 100) do |batch|
  batch.each { |item| process(item) }
  batch.on_progress do |info|
    puts "Chunk #{info[:chunk_index] + 1}/#{info[:total_chunks]} - #{info[:percentage]}%"
  end
end

Top-level Progress Callback

Pass on_progress: at the call site to subscribe without touching every chunk:

progress = ->(info) { puts "#{info[:percentage]}% (#{info[:processed]}/#{info[:total_items]})" }

Philiprehberger::Batch.process(items, size: 100, on_progress: progress) do |batch|
  batch.each { |item| process(item) }
end

Error Collection

result = Philiprehberger::Batch.process(jobs, size: 25) do |batch|
  batch.each { |job| job.execute! }
  batch.on_error { |item, err| log_error(item, err) }
end

result.errors.each do |entry|
  puts "Failed: #{entry[:item]} - #{entry[:error].message}"
end

Early Termination

result = Philiprehberger::Batch.process(items, size: 50) do |batch|
  batch.on_error { |_item, _err| :halt }
  batch.each { |item| risky_operation(item) }
end

result.halted?  # => true if processing stopped early

Retry Per Chunk

result = Philiprehberger::Batch.process(items, size: 100, retries: 2) do |batch|
  batch.each { |item| unreliable_api_call(item) }
end

Result Aggregation

result = Philiprehberger::Batch.process(users, size: 50) do |batch|
  batch.each { |user| user.active? ? :active : :inactive }
end

result.counts                              # => { active: 42, inactive: 8 }
result.flat_map { |status| [status] }      # => [:active, :active, :inactive, ...]
result.group_by { |status| status }        # => { active: [...], inactive: [...] }

Success Rate

result = Philiprehberger::Batch.process(jobs, size: 50) do |batch|
  batch.each { |job| job.execute! }
end

result.success_rate  # => 0.0..1.0 ratio of processed to total (1.0 when total is 0)
puts "#{(result.success_rate * 100).round(1)}% succeeded"

Timing Statistics

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| save(record) }
end

stats = result.timing
puts stats[:total]          # => overall elapsed time in seconds
puts stats[:per_chunk]      # => average time per chunk
puts stats[:per_item]       # => average time per item
puts stats[:fastest_chunk]  # => shortest chunk duration
puts stats[:slowest_chunk]  # => longest chunk duration

Timeout Per Chunk

result = Philiprehberger::Batch.process(items, size: 100, timeout_per_chunk: 30) do |batch|
  batch.each { |item| slow_external_call(item) }
end

# Chunks that exceed 30 seconds are interrupted. The TimeoutError is captured
# in result.errors; items from that chunk are NOT counted in result.processed.
# Processing continues with the remaining chunks.
timeout_errors = result.errors.select { |e| e[:error].is_a?(Philiprehberger::Batch::TimeoutError) }

Filtering Errors by Class

result = Philiprehberger::Batch.process(items, size: 50) do |batch|
  batch.each { |item| item.sync! }
end

timeout_errors = result.filter_errors(Philiprehberger::Batch::TimeoutError)
timeout_errors.each { |e| puts "Chunk timed out: #{e[:item].inspect}" }

arg_errors = result.filter_errors(ArgumentError)
arg_errors.each { |e| puts "Bad argument for #{e[:item]}: #{e[:error].message}" }

Errors for a Specific Item

result = Philiprehberger::Batch.process(records, size: 50) do |batch|
  batch.each { |record| record.save! }
end

result.errors_for(records.first).each do |entry|
  puts "#{entry[:item]} failed: #{entry[:error].message}"
end

Partial Success

result = Philiprehberger::Batch.process([1, 2, 3, 4]) do |batch|
  batch.each { |n| raise "even" if n.even? }
end

result.partial?      # => true (some succeeded, some failed)
result.failed_items  # => [2, 4]

Concurrency

result = Philiprehberger::Batch.process(records, size: 100, concurrency: 4) do |batch|
  batch.each { |record| api_call(record) }
end

result.processed  # => total successful across all threads
result.results    # => collected in chunk order

API

Method / Class Description
`.process(collection, size:, concurrency:, retries:, timeout_per_chunk:, on_progress:) { \ batch\
Batch::TimeoutError Raised internally and captured in Result#errors when a chunk exceeds timeout_per_chunk
`Chunk#each { \ item\
`Chunk#on_progress { \ info\
`Chunk#on_error { \ item, err\
Result#processed Number of successfully processed items
Result#errors Array of error hashes
Result#total Total number of items
Result#chunks Number of chunks processed
Result#elapsed Elapsed time in seconds
Result#success? True if no errors occurred
Result#halted? True if processing was halted early
Result#results Array of collected return values
`Result#flat_map { \ r\
Result#counts Hash counting occurrences of each result value
`Result#group_by { \ r\
Result#success_rate Ratio of processed to total as a Float in [0.0, 1.0] (1.0 when empty)
Result#timing Hash of timing stats: total, per_chunk, per_item, fastest_chunk, slowest_chunk
Result#filter_errors(error_class) Array of { item:, error: } hashes where the error is an instance of the given class
Result#errors_for(item) Array of { item:, error: } hashes for a specific item
Result#failed_items Unique items that errored, in first-failure order
Result#partial? True when some items succeeded and some errored (false on full success or full failure)

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT