Class: Evilution::Parallel::WorkQueue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/evilution/parallel/work_queue/worker.rb

Defined Under Namespace

Modules: Loop Classes: Timing

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker

Returns a new instance of Worker.



40
41
42
43
44
45
46
47
48
49
# File 'lib/evilution/parallel/work_queue/worker.rb', line 40

def initialize(pid:, cmd_write:, res_read:, worker_index:)
  @pid = pid
  @cmd_write = cmd_write
  @res_read = res_read
  @worker_index = worker_index
  @items_completed = 0
  @pending = 0
  @busy_time = 0.0
  @wall_time = 0.0
end

Instance Attribute Details

#busy_timeObject

Returns the value of attribute busy_time.



12
13
14
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12

def busy_time
  @busy_time
end

#items_completedObject

Returns the value of attribute items_completed.



12
13
14
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12

def items_completed
  @items_completed
end

#pendingObject

Returns the value of attribute pending.



12
13
14
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12

def pending
  @pending
end

#pidObject (readonly)

Returns the value of attribute pid.



11
12
13
# File 'lib/evilution/parallel/work_queue/worker.rb', line 11

def pid
  @pid
end

#wall_timeObject

Returns the value of attribute wall_time.



12
13
14
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12

def wall_time
  @wall_time
end

#worker_indexObject (readonly)

Returns the value of attribute worker_index.



11
12
13
# File 'lib/evilution/parallel/work_queue/worker.rb', line 11

def worker_index
  @worker_index
end

Class Method Details

.spawn(worker_index:, hooks:, &block) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14

def self.spawn(worker_index:, hooks:, &block)
  cmd_read, cmd_write = IO.pipe
  res_read, res_write = IO.pipe
  [cmd_read, cmd_write, res_read, res_write].each(&:binmode)

  pid = Process.fork do
    cmd_write.close
    res_read.close
    ENV["TEST_ENV_NUMBER"] = test_env_number_for(worker_index)
    Evilution::ChildOutput.redirect!
    Loop.run(cmd_read, res_write, hooks: hooks, &block)
  end

  cmd_read.close
  res_write.close
  new(pid: pid, cmd_write: cmd_write, res_read: res_read, worker_index: worker_index)
end

.test_env_number_for(worker_index) ⇒ Object

EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …). Rails apps interpolating TEST_ENV_NUMBER into database.yml get per-worker SQLite files, avoiding lock contention under jobs > 1.



36
37
38
# File 'lib/evilution/parallel/work_queue/worker.rb', line 36

def self.test_env_number_for(worker_index)
  worker_index.zero? ? "" : (worker_index + 1).to_s
end

Instance Method Details

#close_pipesObject



76
77
78
79
# File 'lib/evilution/parallel/work_queue/worker.rb', line 76

def close_pipes
  @cmd_write.close unless @cmd_write.closed?
  @res_read.close unless @res_read.closed?
end

#killObject



70
71
72
73
74
# File 'lib/evilution/parallel/work_queue/worker.rb', line 70

def kill
  Process.kill("KILL", @pid)
rescue Errno::ESRCH
  nil
end

#read_resultObject



60
61
62
# File 'lib/evilution/parallel/work_queue/worker.rb', line 60

def read_result
  Evilution::Parallel::WorkQueue::Channel.read(@res_read)
end

#reapObject



81
82
83
84
85
# File 'lib/evilution/parallel/work_queue/worker.rb', line 81

def reap
  Process.wait(@pid)
rescue Errno::ECHILD
  nil
end

#res_ioObject



51
52
53
# File 'lib/evilution/parallel/work_queue/worker.rb', line 51

def res_io
  @res_read
end

#retireObject



87
88
89
90
91
92
93
94
95
# File 'lib/evilution/parallel/work_queue/worker.rb', line 87

def retire
  shutdown
  timing = drain_stats
  close_pipes
  reap
  @busy_time = timing.busy
  @wall_time = timing.wall
  to_stat
end

#send_item(index, item) ⇒ Object



55
56
57
58
# File 'lib/evilution/parallel/work_queue/worker.rb', line 55

def send_item(index, item)
  Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, [index, item])
  @pending += 1
end

#shutdownObject



64
65
66
67
68
# File 'lib/evilution/parallel/work_queue/worker.rb', line 64

def shutdown
  Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, Evilution::Parallel::WorkQueue::SHUTDOWN)
rescue Errno::EPIPE
  nil
end

#to_statObject



97
98
99
100
101
# File 'lib/evilution/parallel/work_queue/worker.rb', line 97

def to_stat
  Evilution::Parallel::WorkQueue::WorkerStat.new(
    @pid, @items_completed, @busy_time || 0.0, @wall_time || 0.0
  )
end