philiprehberger-retry_queue

Tests Gem Version Last updated

Batch processor with per-item retry, backoff, and dead letter collection

Requirements

  • Ruby >= 3.1

Installation

Add to your Gemfile:

gem "philiprehberger-retry_queue"

Or install directly:

gem install philiprehberger-retry_queue

Usage

require "philiprehberger/retry_queue"

result = Philiprehberger::RetryQueue.process(items, max_retries: 3) do |item|
  process_item(item)
end

puts result.succeeded.size  # => number of successful items
puts result.failed.size     # => number of failed items

Custom Backoff

result = Philiprehberger::RetryQueue.process(items, max_retries: 5, backoff: ->(n) { n * 0.5 }) do |item|
  external_api_call(item)
end

Selective Retry

result = Philiprehberger::RetryQueue.process(items, max_retries: 3, retry_on: [Net::OpenTimeout, Timeout::Error]) do |item|
  api_call(item)
end

# Only Net::OpenTimeout and Timeout::Error trigger retries
# All other errors send the item straight to failed

Retry Hooks

logger_hook = ->(item, error, attempt) { puts "Retrying #{item}: #{error.message} (attempt #{attempt})" }
metrics_hook = ->(item, _error, _attempt) { increment_counter("retry.#{item}") }

result = Philiprehberger::RetryQueue.process(items, max_retries: 3, on_retry: [logger_hook, metrics_hook]) do |item|
  process_item(item)
end

Dead-letter Notifications

on_failure = ->(item, error) { Rails.logger.error("Dead-lettered #{item}: #{error.message}") }

result = Philiprehberger::RetryQueue.process(items, max_retries: 3, on_failure: on_failure) do |item|
  process_item(item)
end

The hook fires once per item that exhausts its retries, just as the item is recorded in Result#failed. Exceptions raised inside the hook are swallowed so a faulty callback cannot break the queue.

DLQ Reprocessing

result = Philiprehberger::RetryQueue.process(jobs, max_retries: 2) do |job|
  job.execute!
end

reprocessed = result.reprocess_failed do |item, error|
  fallback_handler(item, error)
end

puts reprocessed.succeeded.size  # => items recovered during reprocessing
puts reprocessed.failed.size     # => items that failed reprocessing too

Statistics

result = Philiprehberger::RetryQueue.process(records, max_retries: 3) do |record|
  save(record)
end

stats = result.stats
# => { total: 100, succeeded: 97, failed: 3, success_rate: 0.97, elapsed: 1.23 }

API

Method Description
`.process(items, max_retries:, concurrency:, backoff:, retry_on:, on_retry:, on_failure:) { \ item\
on_failure: Callable (item, error) invoked once per item that exhausts retries; hook errors are swallowed
Result#succeeded Array of successfully processed items
Result#failed Array of hashes with :item, :error, :attempts
Result#stats Hash with :total, :succeeded, :failed, :success_rate, :elapsed
`Result#reprocess_failed { \ item, error\

Development

bundle install
bundle exec rspec
bundle exec rubocop

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT