Class: Evilution::Parallel::WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/evilution/parallel/work_queue.rb

Defined Under Namespace

Classes: WorkerStat

Constant Summary collapse

SHUTDOWN =
:__shutdown__
STATS =
:__stats__
TIMING_GRACE_PERIOD =
5

Instance Method Summary collapse

Constructor Details

#initialize(size:, hooks: nil, prefetch: 1, item_timeout: nil, worker_max_items: nil) ⇒ WorkQueue

Returns a new instance of WorkQueue.



24
25
26
27
28
29
30
31
32
# File 'lib/evilution/parallel/work_queue.rb', line 24

def initialize(size:, hooks: nil, prefetch: 1, item_timeout: nil, worker_max_items: nil)
  validate_init_args(size, prefetch, item_timeout, worker_max_items)
  @size = size
  @hooks = hooks
  @prefetch = prefetch
  @item_timeout = item_timeout
  @worker_max_items = worker_max_items
  @worker_stats = []
end

Instance Method Details

#map(items, &block) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/evilution/parallel/work_queue.rb', line 34

def map(items, &block)
  return [] if items.empty?

  @block = block
  @retired_workers = []
  worker_count = [@size, items.length].min
  workers = spawn_workers(worker_count, &block)

  begin
    distribute_and_collect(items, workers)
  ensure
    shutdown_workers(workers)
    @worker_stats = @retired_workers + build_worker_stats(workers)
    @block = nil
    @retired_workers = nil
  end
end

#worker_statsObject



52
53
54
# File 'lib/evilution/parallel/work_queue.rb', line 52

def worker_stats
  @worker_stats.map { |stat| stat.dup.freeze }
end