Class: Evilution::Parallel::WorkQueue::Worker
- Inherits:
-
Object
- Object
- Evilution::Parallel::WorkQueue::Worker
- Defined in:
- lib/evilution/parallel/work_queue/worker.rb
Defined Under Namespace
Modules: Loop
Instance Attribute Summary collapse
-
#busy_time ⇒ Object
Returns the value of attribute busy_time.
-
#items_completed ⇒ Object
Returns the value of attribute items_completed.
-
#pending ⇒ Object
Returns the value of attribute pending.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#wall_time ⇒ Object
Returns the value of attribute wall_time.
-
#worker_index ⇒ Object
readonly
Returns the value of attribute worker_index.
Class Method Summary collapse
- .spawn(worker_index:, hooks:, &block) ⇒ Object
-
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …).
Instance Method Summary collapse
- #close_pipes ⇒ Object
-
#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker
constructor
A new instance of Worker.
- #kill ⇒ Object
- #read_result ⇒ Object
- #reap ⇒ Object
- #res_io ⇒ Object
- #retire ⇒ Object
- #send_item(index, item) ⇒ Object
- #shutdown ⇒ Object
- #to_stat ⇒ Object
Constructor Details
#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker
Returns a new instance of Worker.
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 38 def initialize(pid:, cmd_write:, res_read:, worker_index:) @pid = pid @cmd_write = cmd_write @res_read = res_read @worker_index = worker_index @items_completed = 0 @pending = 0 @busy_time = 0.0 @wall_time = 0.0 end |
Instance Attribute Details
#busy_time ⇒ Object
Returns the value of attribute busy_time.
10 11 12 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10 def busy_time @busy_time end |
#items_completed ⇒ Object
Returns the value of attribute items_completed.
10 11 12 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10 def items_completed @items_completed end |
#pending ⇒ Object
Returns the value of attribute pending.
10 11 12 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10 def pending @pending end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
9 10 11 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 9 def pid @pid end |
#wall_time ⇒ Object
Returns the value of attribute wall_time.
10 11 12 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 10 def wall_time @wall_time end |
#worker_index ⇒ Object (readonly)
Returns the value of attribute worker_index.
9 10 11 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 9 def worker_index @worker_index end |
Class Method Details
.spawn(worker_index:, hooks:, &block) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def self.spawn(worker_index:, hooks:, &block) cmd_read, cmd_write = IO.pipe res_read, res_write = IO.pipe [cmd_read, cmd_write, res_read, res_write].each(&:binmode) pid = Process.fork do cmd_write.close res_read.close ENV["TEST_ENV_NUMBER"] = test_env_number_for(worker_index) Evilution::ChildOutput.redirect! Loop.run(cmd_read, res_write, hooks: hooks, &block) end cmd_read.close res_write.close new(pid: pid, cmd_write: cmd_write, res_read: res_read, worker_index: worker_index) end |
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …). Rails apps interpolating TEST_ENV_NUMBER into database.yml get per-worker SQLite files, avoiding lock contention under jobs > 1.
34 35 36 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 34 def self.test_env_number_for(worker_index) worker_index.zero? ? "" : (worker_index + 1).to_s end |
Instance Method Details
#close_pipes ⇒ Object
74 75 76 77 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 74 def close_pipes @cmd_write.close unless @cmd_write.closed? @res_read.close unless @res_read.closed? end |
#kill ⇒ Object
68 69 70 71 72 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 68 def kill Process.kill("KILL", @pid) rescue Errno::ESRCH nil end |
#read_result ⇒ Object
58 59 60 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 58 def read_result Evilution::Parallel::WorkQueue::Channel.read(@res_read) end |
#reap ⇒ Object
79 80 81 82 83 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 79 def reap Process.wait(@pid) rescue Errno::ECHILD nil end |
#res_io ⇒ Object
49 50 51 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 49 def res_io @res_read end |
#retire ⇒ Object
85 86 87 88 89 90 91 92 93 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 85 def retire shutdown busy, wall = drain_stats close_pipes reap @busy_time = busy @wall_time = wall to_stat end |
#send_item(index, item) ⇒ Object
53 54 55 56 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 53 def send_item(index, item) Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, [index, item]) @pending += 1 end |
#shutdown ⇒ Object
62 63 64 65 66 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 62 def shutdown Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, Evilution::Parallel::WorkQueue::SHUTDOWN) rescue Errno::EPIPE nil end |
#to_stat ⇒ Object
95 96 97 98 99 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 95 def to_stat Evilution::Parallel::WorkQueue::WorkerStat.new( @pid, @items_completed, @busy_time || 0.0, @wall_time || 0.0 ) end |