Class: Philiprehberger::RetryQueue::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/retry_queue/processor.rb

Overview

Processes items with per-item retry, backoff, and dead letter collection.

Constant Summary collapse

DEFAULT_BACKOFF =

Default backoff strategy: exponential with base 0.1s.

->(attempt) { 0.1 * (2**attempt) }

Instance Method Summary collapse

Constructor Details

#initialize(max_retries: 3, concurrency: 1, backoff: nil, retry_on: nil, on_retry: nil, on_failure: nil) ⇒ Processor

Returns a new instance of Processor.

Parameters:

  • max_retries (Integer) (defaults to: 3)

    maximum retry attempts per item

  • concurrency (Integer) (defaults to: 1)

    number of concurrent workers (reserved for future use)

  • backoff (Proc, nil) (defaults to: nil)

    proc receiving attempt number, returns sleep duration

  • retry_on (Array<Class>, nil) (defaults to: nil)

    exception classes to retry on; nil means retry all

  • on_retry (Array<Proc>, Proc, nil) (defaults to: nil)

    callbacks fired before each retry attempt

  • on_failure (Proc, nil) (defaults to: nil)

    callable invoked with ‘(item, error)` once per item that exhausts retries and moves to the dead-letter list; exceptions raised by the hook are swallowed so a faulty hook cannot break the queue

Raises:



18
19
20
21
22
23
24
25
26
27
# File 'lib/philiprehberger/retry_queue/processor.rb', line 18

def initialize(max_retries: 3, concurrency: 1, backoff: nil, retry_on: nil, on_retry: nil, on_failure: nil)
  raise Error, 'max_retries must be non-negative' unless max_retries.is_a?(Integer) && max_retries >= 0

  @max_retries = max_retries
  @concurrency = concurrency
  @backoff = backoff || DEFAULT_BACKOFF
  @retry_on = retry_on
  @on_retry_hooks = Array(on_retry)
  @on_failure = on_failure
end

Instance Method Details

#call(items) {|item| ... } ⇒ Result

Process a collection of items with retry logic.

Parameters:

  • items (Array)

    items to process

Yields:

  • (item)

    block that processes a single item; raise to signal failure

Returns:

  • (Result)

    processing result with succeeded, failed, and stats

Raises:



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/philiprehberger/retry_queue/processor.rb', line 34

def call(items, &block)
  raise Error, 'a processing block is required' unless block

  succeeded = []
  failed = []
  start_time = now

  items.each do |item|
    process_item(item, succeeded, failed, &block)
  end

  Result.new(succeeded: succeeded, failed: failed, elapsed: now - start_time)
end