Class: Philiprehberger::ParallelEach::WorkerPool
- Inherits:
-
Object
- Object
- Philiprehberger::ParallelEach::WorkerPool
- Defined in:
- lib/philiprehberger/parallel_each/worker_pool.rb
Overview
Thread pool that processes work items from a queue. Each item is a [index, element] pair. Results are collected with their original index so callers can reassemble order.
Defined Under Namespace
Classes: Result
Instance Attribute Summary collapse
-
#completed ⇒ Object
readonly
Returns the value of attribute completed.
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#end_time ⇒ Object
readonly
Returns the value of attribute end_time.
-
#failed ⇒ Object
readonly
Returns the value of attribute failed.
-
#start_time ⇒ Object
readonly
Returns the value of attribute start_time.
Instance Method Summary collapse
-
#elapsed_seconds ⇒ Object
Wall-clock duration of the most recent run, or nil if no run has finished.
-
#initialize(concurrency:) ⇒ WorkerPool
constructor
A new instance of WorkerPool.
-
#run(collection) ⇒ Object
Processes each element of the collection through the block using a thread pool.
-
#stats ⇒ Object
Snapshot of execution stats from the most recent run.
Constructor Details
#initialize(concurrency:) ⇒ WorkerPool
Returns a new instance of WorkerPool.
13 14 15 16 17 18 19 20 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 13 def initialize(concurrency:) @concurrency = [concurrency, 1].max @completed = 0 @failed = 0 @start_time = nil @end_time = nil @stats_mutex = Mutex.new end |
Instance Attribute Details
#completed ⇒ Object (readonly)
Returns the value of attribute completed.
11 12 13 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11 def completed @completed end |
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
11 12 13 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11 def concurrency @concurrency end |
#end_time ⇒ Object (readonly)
Returns the value of attribute end_time.
11 12 13 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11 def end_time @end_time end |
#failed ⇒ Object (readonly)
Returns the value of attribute failed.
11 12 13 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11 def failed @failed end |
#start_time ⇒ Object (readonly)
Returns the value of attribute start_time.
11 12 13 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11 def start_time @start_time end |
Instance Method Details
#elapsed_seconds ⇒ Object
Wall-clock duration of the most recent run, or nil if no run has finished.
23 24 25 26 27 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 23 def elapsed_seconds return nil if @start_time.nil? || @end_time.nil? @end_time - @start_time end |
#run(collection) ⇒ Object
Processes each element of the collection through the block using a thread pool. Returns an array of Result structs sorted by index.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 41 def run(collection, &) queue = build_queue(collection) results = [] mutex = Mutex.new error_state = { first: nil, mutex: Mutex.new } reset_stats threads = spawn_workers(queue, results, mutex, error_state, &) threads.each(&:join) finalize_stats ParallelEach.send(:record_stats, stats) raise error_state[:first] if error_state[:first] results.sort_by(&:index) end |
#stats ⇒ Object
Snapshot of execution stats from the most recent run.
30 31 32 33 34 35 36 37 |
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 30 def stats { workers: @concurrency, completed: @completed, failed: @failed, elapsed_seconds: elapsed_seconds } end |