Class: Philiprehberger::ParallelEach::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/philiprehberger/parallel_each/worker_pool.rb

Overview

Thread pool that processes work items from a queue. Each item is a [index, element] pair. Results are collected with their original index so callers can reassemble order.

Defined Under Namespace

Classes: Result

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency:) ⇒ WorkerPool

Returns a new instance of WorkerPool.



13
14
15
16
17
18
19
20
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 13

def initialize(concurrency:)
  @concurrency = [concurrency, 1].max
  @completed = 0
  @failed = 0
  @start_time = nil
  @end_time = nil
  @stats_mutex = Mutex.new
end

Instance Attribute Details

#completedObject (readonly)

Returns the value of attribute completed.



11
12
13
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11

def completed
  @completed
end

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



11
12
13
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11

def concurrency
  @concurrency
end

#end_timeObject (readonly)

Returns the value of attribute end_time.



11
12
13
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11

def end_time
  @end_time
end

#failedObject (readonly)

Returns the value of attribute failed.



11
12
13
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11

def failed
  @failed
end

#start_timeObject (readonly)

Returns the value of attribute start_time.



11
12
13
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 11

def start_time
  @start_time
end

Instance Method Details

#elapsed_secondsObject

Wall-clock duration of the most recent run, or nil if no run has finished.



23
24
25
26
27
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 23

def elapsed_seconds
  return nil if @start_time.nil? || @end_time.nil?

  @end_time - @start_time
end

#run(collection) ⇒ Object

Processes each element of the collection through the block using a thread pool. Returns an array of Result structs sorted by index.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 41

def run(collection, &)
  queue = build_queue(collection)
  results = []
  mutex = Mutex.new
  error_state = { first: nil, mutex: Mutex.new }

  reset_stats
  threads = spawn_workers(queue, results, mutex, error_state, &)
  threads.each(&:join)
  finalize_stats

  ParallelEach.send(:record_stats, stats)
  raise error_state[:first] if error_state[:first]

  results.sort_by(&:index)
end

#statsObject

Snapshot of execution stats from the most recent run.



30
31
32
33
34
35
36
37
# File 'lib/philiprehberger/parallel_each/worker_pool.rb', line 30

def stats
  {
    workers: @concurrency,
    completed: @completed,
    failed: @failed,
    elapsed_seconds: elapsed_seconds
  }
end