Class: Philiprehberger::RetryQueue::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/retry_queue/processor.rb

Overview

Processes items with per-item retry, backoff, and dead letter collection.

Constant Summary collapse

DEFAULT_BACKOFF =

Default backoff strategy: exponential with base 0.1s.

->(attempt) { 0.1 * (2**attempt) }

Instance Method Summary collapse

Constructor Details

#initialize(max_retries: 3, concurrency: 1, backoff: nil, retry_on: nil, on_retry: nil, on_failure: nil, jitter: 0.0) ⇒ Processor

Returns a new instance of Processor.

Parameters:

  • max_retries (Integer) (defaults to: 3)

    maximum retry attempts per item. ‘max_retries: 0` means one attempt with no retries (not zero attempts).

  • concurrency (Integer) (defaults to: 1)

    number of concurrent workers (reserved for future use)

  • backoff (Proc, nil) (defaults to: nil)

    proc receiving attempt number, returns sleep duration

  • retry_on (Array<Class>, nil) (defaults to: nil)

    exception classes to retry on; nil means retry all

  • on_retry (Array<Proc>, Proc, nil) (defaults to: nil)

    callbacks fired before each retry attempt

  • on_failure (Proc, nil) (defaults to: nil)

    callable invoked with ‘(item, error)` once per item that exhausts retries and moves to the dead-letter list; exceptions raised by the hook are swallowed so a faulty hook cannot break the queue

  • jitter (Numeric) (defaults to: 0.0)

    fraction in ‘0.0..1.0` applied to the computed backoff delay as `delay * (1 + rand * jitter)`. Defaults to `0.0` (no jitter).

Raises:



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/philiprehberger/retry_queue/processor.rb', line 21

def initialize(max_retries: 3, concurrency: 1, backoff: nil, retry_on: nil, on_retry: nil, on_failure: nil,
               jitter: 0.0)
  raise Error, 'max_retries must be non-negative' unless max_retries.is_a?(Integer) && max_retries >= 0
  raise ArgumentError, 'jitter must be a Numeric in 0.0..1.0' unless valid_jitter?(jitter)

  @max_retries = max_retries
  @concurrency = concurrency
  @backoff = backoff || DEFAULT_BACKOFF
  @retry_on = retry_on
  @on_retry_hooks = Array(on_retry)
  @on_failure = on_failure
  @jitter = jitter.to_f
end

Instance Method Details

#call(items) {|item| ... } ⇒ Result

Process a collection of items with retry logic.

Parameters:

  • items (Array)

    items to process

Yields:

  • (item)

    block that processes a single item; raise to signal failure

Returns:

  • (Result)

    processing result with succeeded, failed, and stats

Raises:



40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/philiprehberger/retry_queue/processor.rb', line 40

def call(items, &block)
  raise Error, 'a processing block is required' unless block

  succeeded = []
  failed = []
  start_time = now

  items.each do |item|
    process_item(item, succeeded, failed, &block)
  end

  Result.new(succeeded: succeeded, failed: failed, elapsed: now - start_time)
end