Class: Philiprehberger::ParallelEach::WorkerPool
- Inherits:
-
Object
- Object
- Philiprehberger::ParallelEach::WorkerPool
- Defined in:
- lib/philiprehberger/parallel_each/worker_pool.rb
Overview
Thread pool that processes work items from a queue. Each item is a [index, element] pair. Results are collected with their original index so callers can reassemble order.
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
Instance Method Summary collapse
-
#initialize(concurrency:) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
-
#run(collection, &block) ⇒ Object
Processes each element of the collection through the block using a thread pool.
Constructor Details
#initialize(concurrency:) ⇒ WorkerPool
Returns a new instance of WorkerPool.
13 14 15 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 13 def initialize(concurrency:) @concurrency = [concurrency, 1].max end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
11 12 13 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11 def concurrency @concurrency end |
Instance Method Details
#run(collection, &block) ⇒ Object
Processes each element of the collection through the block using a thread pool. Returns an array of Result structs sorted by index.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 19 def run(collection, &block) queue = Queue.new collection.each_with_index { |item, idx| queue << [idx, item] } @concurrency.times { queue << :stop } results = [] mutex = Mutex.new first_error = nil error_mutex = Mutex.new threads = Array.new(@concurrency) do Thread.new do loop do work = queue.pop break if work == :stop idx, item = work # Skip remaining work if an error has occurred next if first_error begin value = block.call(item) mutex.synchronize { results << Result.new(index: idx, value: value) } rescue StandardError => e error_mutex.synchronize { first_error ||= e } end end end end threads.each(&:join) raise first_error if first_error results.sort_by(&:index) end |