Module: Evilution::Parallel::WorkQueue::Worker::Loop

Defined in:
lib/evilution/parallel/work_queue/worker/loop.rb

Class Method Summary collapse

Class Method Details

.run(cmd_io, res_io, hooks: nil, &block) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/evilution/parallel/work_queue/worker/loop.rb', line 10

def run(cmd_io, res_io, hooks: nil, &block)
  hooks.fire(:worker_process_start) if hooks
  start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  busy = 0.0

  loop do
    data = Evilution::Parallel::WorkQueue::Channel.read(cmd_io)
    break if data.nil? || data == Evilution::Parallel::WorkQueue::SHUTDOWN

    busy += run_one(res_io, data, &block)
  end

  wall = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
  Evilution::Parallel::WorkQueue::Channel.write(
    res_io, [Evilution::Parallel::WorkQueue::STATS, busy, wall]
  )
ensure
  cmd_io.close unless cmd_io.closed?
  res_io.close unless res_io.closed?
  exit!
end

.run_one(res_io, data, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/evilution/parallel/work_queue/worker/loop.rb', line 32

def run_one(res_io, data, &block)
  index, item = data
  t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  begin
    result = block.call(item)
    elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
    Evilution::Parallel::WorkQueue::Channel.write(res_io, [index, :ok, result])
  rescue Exception => e # rubocop:disable Lint/RescueException
    elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
    Evilution::Parallel::WorkQueue::Channel.write(res_io, [index, :error, e])
  end
  elapsed
end