Class: Evilution::Parallel::WorkQueue::Worker
- Inherits:
-
Object
- Object
- Evilution::Parallel::WorkQueue::Worker
- Defined in:
- lib/evilution/parallel/work_queue/worker.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#busy_time ⇒ Object
Returns the value of attribute busy_time.
-
#items_completed ⇒ Object
Returns the value of attribute items_completed.
-
#pending ⇒ Object
Returns the value of attribute pending.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#wall_time ⇒ Object
Returns the value of attribute wall_time.
-
#worker_index ⇒ Object
readonly
Returns the value of attribute worker_index.
Class Method Summary collapse
- .spawn(worker_index:, hooks:, &block) ⇒ Object
-
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …).
Instance Method Summary collapse
- #close_pipes ⇒ Object
-
#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker
constructor
A new instance of Worker.
- #kill ⇒ Object
- #read_result ⇒ Object
- #reap ⇒ Object
- #res_io ⇒ Object
- #retire ⇒ Object
- #send_item(index, item) ⇒ Object
- #shutdown ⇒ Object
- #to_stat ⇒ Object
Constructor Details
#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker
Returns a new instance of Worker.
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 40 def initialize(pid:, cmd_write:, res_read:, worker_index:) @pid = pid @cmd_write = cmd_write @res_read = res_read @worker_index = worker_index @items_completed = 0 @pending = 0 @busy_time = 0.0 @wall_time = 0.0 end |
Instance Attribute Details
#busy_time ⇒ Object
Returns the value of attribute busy_time.
12 13 14 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def busy_time @busy_time end |
#items_completed ⇒ Object
Returns the value of attribute items_completed.
12 13 14 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def items_completed @items_completed end |
#pending ⇒ Object
Returns the value of attribute pending.
12 13 14 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def pending @pending end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
11 12 13 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 11 def pid @pid end |
#wall_time ⇒ Object
Returns the value of attribute wall_time.
12 13 14 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def wall_time @wall_time end |
#worker_index ⇒ Object (readonly)
Returns the value of attribute worker_index.
11 12 13 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 11 def worker_index @worker_index end |
Class Method Details
.spawn(worker_index:, hooks:, &block) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14 def self.spawn(worker_index:, hooks:, &block) cmd_read, cmd_write = IO.pipe res_read, res_write = IO.pipe [cmd_read, cmd_write, res_read, res_write].each(&:binmode) pid = Process.fork do cmd_write.close res_read.close ENV["TEST_ENV_NUMBER"] = test_env_number_for(worker_index) Evilution::ChildOutput.redirect! Loop.run(cmd_read, res_write, hooks: hooks, &block) end cmd_read.close res_write.close new(pid: pid, cmd_write: cmd_write, res_read: res_read, worker_index: worker_index) end |
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …). Rails apps interpolating TEST_ENV_NUMBER into database.yml get per-worker SQLite files, avoiding lock contention under jobs > 1.
36 37 38 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 36 def self.test_env_number_for(worker_index) worker_index.zero? ? "" : (worker_index + 1).to_s end |
Instance Method Details
#close_pipes ⇒ Object
76 77 78 79 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 76 def close_pipes @cmd_write.close unless @cmd_write.closed? @res_read.close unless @res_read.closed? end |
#kill ⇒ Object
70 71 72 73 74 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 70 def kill Process.kill("KILL", @pid) rescue Errno::ESRCH nil end |
#read_result ⇒ Object
60 61 62 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 60 def read_result Evilution::Parallel::WorkQueue::Channel.read(@res_read) end |
#reap ⇒ Object
81 82 83 84 85 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 81 def reap Process.wait(@pid) rescue Errno::ECHILD nil end |
#res_io ⇒ Object
51 52 53 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 51 def res_io @res_read end |
#retire ⇒ Object
87 88 89 90 91 92 93 94 95 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 87 def retire shutdown timing = drain_stats close_pipes reap @busy_time = timing.busy @wall_time = timing.wall to_stat end |
#send_item(index, item) ⇒ Object
55 56 57 58 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 55 def send_item(index, item) Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, [index, item]) @pending += 1 end |
#shutdown ⇒ Object
64 65 66 67 68 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 64 def shutdown Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, Evilution::Parallel::WorkQueue::SHUTDOWN) rescue Errno::EPIPE nil end |
#to_stat ⇒ Object
97 98 99 100 101 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 97 def to_stat Evilution::Parallel::WorkQueue::WorkerStat.new( @pid, @items_completed, @busy_time || 0.0, @wall_time || 0.0 ) end |