Class: Philiprehberger::Batch::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/batch/processor.rb

Overview

Processes collections in chunks with progress tracking and error collection.

Instance Method Summary collapse

Constructor Details

#initialize(size: 100, concurrency: 1, retries: 0) ⇒ Processor

Returns a new instance of Processor.

Parameters:

  • size (Integer) (defaults to: 100)

    chunk size

  • concurrency (Integer) (defaults to: 1)

    number of concurrent workers (default: 1)

  • retries (Integer) (defaults to: 0)

    max retries per failed item (default: 0)

Raises:



10
11
12
13
14
15
16
17
18
# File 'lib/philiprehberger/batch/processor.rb', line 10

def initialize(size: 100, concurrency: 1, retries: 0)
  raise Error, 'size must be positive' unless size.is_a?(Integer) && size.positive?
  raise Error, 'concurrency must be positive' unless concurrency.is_a?(Integer) && concurrency.positive?
  raise Error, 'retries must be non-negative' unless retries.is_a?(Integer) && retries >= 0

  @size = size
  @concurrency = concurrency
  @retries = retries
end

Instance Method Details

#call(collection, on_progress: nil) {|chunk| ... } ⇒ Result

Process a collection in batches.

Parameters:

  • collection (Array, Enumerable)

    items to process

  • on_progress (Proc, nil) (defaults to: nil)

    optional callback invoked after each chunk with a progress info hash

Yields:

  • (chunk)

    block that receives a Chunk object for processing

Returns:

  • (Result)

    processing result

Raises:



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/philiprehberger/batch/processor.rb', line 26

def call(collection, on_progress: nil, &block)
  raise Error, 'a processing block is required' unless block

  items = collection.to_a
  slices = items.each_slice(@size).to_a
  @on_progress = on_progress

  if @concurrency > 1 && slices.size > 1
    call_concurrent(slices, items, &block)
  else
    call_sequential(slices, items, &block)
  end
end