Class: Philiprehberger::ParallelEach::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/parallel_each/worker_pool.rb

Overview

Thread pool that processes work items from a queue. Each item is a [index, element] pair. Results are collected with their original index so callers can reassemble order.

Defined Under Namespace

Classes: Result

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency:) ⇒ WorkerPool

Returns a new instance of WorkerPool.



13
14
15
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 13

def initialize(concurrency:)
  @concurrency = [concurrency, 1].max
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



11
12
13
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11

def concurrency
  @concurrency
end

Instance Method Details

#run(collection, &block) ⇒ Object

Processes each element of the collection through the block using a thread pool. Returns an array of Result structs sorted by index.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 19

def run(collection, &block)
  queue = Queue.new
  collection.each_with_index { |item, idx| queue << [idx, item] }
  @concurrency.times { queue << :stop }

  results = []
  mutex = Mutex.new
  first_error = nil
  error_mutex = Mutex.new

  threads = Array.new(@concurrency) do
    Thread.new do
      loop do
        work = queue.pop
        break if work == :stop

        idx, item = work

        # Skip remaining work if an error has occurred
        next if first_error

        begin
          value = block.call(item)
          mutex.synchronize { results << Result.new(index: idx, value: value) }
        rescue StandardError => e
          error_mutex.synchronize { first_error ||= e }
        end
      end
    end
  end

  threads.each(&:join)

  raise first_error if first_error

  results.sort_by(&:index)
end