Class: Evilution::Parallel::WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/evilution/parallel/work_queue/collection_state.rb,
lib/evilution/parallel/work_queue.rb,
lib/evilution/parallel/work_queue/worker_stat.rb

Overview

CollectionState is a top-level private constant on WorkQueue (not under a sub-namespace) so Dispatcher accesses it via const_get.

Defined Under Namespace

Modules: Channel, Validators Classes: Dispatcher, Worker, WorkerStat

Constant Summary collapse

SHUTDOWN =
:__shutdown__
STATS =
:__stats__
TIMING_GRACE_PERIOD =
5

Instance Method Summary collapse

Constructor Details

#initialize(size:, hooks: nil, prefetch: 1, item_timeout: nil, worker_max_items: nil) ⇒ WorkQueue

Returns a new instance of WorkQueue.



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/evilution/parallel/work_queue.rb', line 12

def initialize(size:, hooks: nil, prefetch: 1, item_timeout: nil, worker_max_items: nil)
  Validators::PositiveInt.call!(:size, size)
  Validators::PositiveInt.call!(:prefetch, prefetch)
  Validators::OptionalPositiveNumber.call!(:item_timeout, item_timeout)
  Validators::OptionalPositiveInt.call!(:worker_max_items, worker_max_items)

  @size = size
  @hooks = hooks
  @prefetch = prefetch
  @item_timeout = item_timeout
  @worker_max_items = worker_max_items
  @worker_stats = []
end

Instance Method Details

#map(items, &block) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/evilution/parallel/work_queue.rb', line 26

def map(items, &block)
  return [] if items.empty?

  workers = (0...[@size, items.length].min).map { |i| spawn_one(i, &block) }
  dispatcher = Dispatcher.new(
    workers: workers, items: items, prefetch: @prefetch,
    item_timeout: @item_timeout, worker_max_items: @worker_max_items,
    recycle_factory: ->(old) { spawn_one(old.worker_index, &block) }
  )

  retired = []
  begin
    results, retired = dispatcher.run
    raise dispatcher.first_error if dispatcher.first_error

    results
  ensure
    workers.each(&:shutdown)
    collect_final_timings(workers)
    workers.each(&:close_pipes)
    workers.each(&:reap)
    @worker_stats = retired + workers.map(&:to_stat)
  end
end

#worker_statsObject



51
52
53
# File 'lib/evilution/parallel/work_queue.rb', line 51

def worker_stats
  @worker_stats.map { |stat| stat.dup.freeze }
end