Class: Evilution::Parallel::WorkQueue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/evilution/parallel/work_queue/worker.rb

Defined Under Namespace

Modules: Loop

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker

Returns a new instance of Worker.



38
39
40
41
42
43
44
45
46
47
# File 'lib/evilution/parallel/work_queue/worker.rb', line 38

def initialize(pid:, cmd_write:, res_read:, worker_index:)
  @pid = pid
  @cmd_write = cmd_write
  @res_read = res_read
  @worker_index = worker_index
  @items_completed = 0
  @pending = 0
  @busy_time = 0.0
  @wall_time = 0.0
end

Instance Attribute Details

#busy_timeObject

Returns the value of attribute busy_time.



10
11
12
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10

def busy_time
  @busy_time
end

#items_completedObject

Returns the value of attribute items_completed.



10
11
12
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10

def items_completed
  @items_completed
end

#pendingObject

Returns the value of attribute pending.



10
11
12
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10

def pending
  @pending
end

#pidObject (readonly)

Returns the value of attribute pid.



9
10
11
# File 'lib/evilution/parallel/work_queue/worker.rb', line 9

def pid
  @pid
end

#wall_timeObject

Returns the value of attribute wall_time.



10
11
12
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10

def wall_time
  @wall_time
end

#worker_indexObject (readonly)

Returns the value of attribute worker_index.



9
10
11
# File 'lib/evilution/parallel/work_queue/worker.rb', line 9

def worker_index
  @worker_index
end

Class Method Details

.spawn(worker_index:, hooks:, &block) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12

def self.spawn(worker_index:, hooks:, &block)
  cmd_read, cmd_write = IO.pipe
  res_read, res_write = IO.pipe
  [cmd_read, cmd_write, res_read, res_write].each(&:binmode)

  pid = Process.fork do
    cmd_write.close
    res_read.close
    ENV["TEST_ENV_NUMBER"] = test_env_number_for(worker_index)
    Evilution::ChildOutput.redirect!
    Loop.run(cmd_read, res_write, hooks: hooks, &block)
  end

  cmd_read.close
  res_write.close
  new(pid: pid, cmd_write: cmd_write, res_read: res_read, worker_index: worker_index)
end

.test_env_number_for(worker_index) ⇒ Object

EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …). Rails apps interpolating TEST_ENV_NUMBER into database.yml get per-worker SQLite files, avoiding lock contention under jobs > 1.



34
35
36
# File 'lib/evilution/parallel/work_queue/worker.rb', line 34

def self.test_env_number_for(worker_index)
  worker_index.zero? ? "" : (worker_index + 1).to_s
end

Instance Method Details

#close_pipesObject



74
75
76
77
# File 'lib/evilution/parallel/work_queue/worker.rb', line 74

def close_pipes
  @cmd_write.close unless @cmd_write.closed?
  @res_read.close unless @res_read.closed?
end

#killObject



68
69
70
71
72
# File 'lib/evilution/parallel/work_queue/worker.rb', line 68

def kill
  Process.kill("KILL", @pid)
rescue Errno::ESRCH
  nil
end

#read_resultObject



58
59
60
# File 'lib/evilution/parallel/work_queue/worker.rb', line 58

def read_result
  Evilution::Parallel::WorkQueue::Channel.read(@res_read)
end

#reapObject



79
80
81
82
83
# File 'lib/evilution/parallel/work_queue/worker.rb', line 79

def reap
  Process.wait(@pid)
rescue Errno::ECHILD
  nil
end

#res_ioObject



49
50
51
# File 'lib/evilution/parallel/work_queue/worker.rb', line 49

def res_io
  @res_read
end

#retireObject



85
86
87
88
89
90
91
92
93
# File 'lib/evilution/parallel/work_queue/worker.rb', line 85

def retire
  shutdown
  busy, wall = drain_stats
  close_pipes
  reap
  @busy_time = busy
  @wall_time = wall
  to_stat
end

#send_item(index, item) ⇒ Object



53
54
55
56
# File 'lib/evilution/parallel/work_queue/worker.rb', line 53

def send_item(index, item)
  Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, [index, item])
  @pending += 1
end

#shutdownObject



62
63
64
65
66
# File 'lib/evilution/parallel/work_queue/worker.rb', line 62

def shutdown
  Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, Evilution::Parallel::WorkQueue::SHUTDOWN)
rescue Errno::EPIPE
  nil
end

#to_statObject



95
96
97
98
99
# File 'lib/evilution/parallel/work_queue/worker.rb', line 95

def to_stat
  Evilution::Parallel::WorkQueue::WorkerStat.new(
    @pid, @items_completed, @busy_time || 0.0, @wall_time || 0.0
  )
end