Class: Evilution::Parallel::WorkQueue::Worker
- Inherits:
-
Object
- Object
- Evilution::Parallel::WorkQueue::Worker
- Defined in:
- lib/evilution/parallel/work_queue/worker.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#busy_time ⇒ Object
Returns the value of attribute busy_time.
-
#deadline ⇒ Object
Returns the value of attribute deadline.
-
#in_flight_indices ⇒ Object
readonly
Returns the value of attribute in_flight_indices.
-
#items_completed ⇒ Object
Returns the value of attribute items_completed.
-
#pending ⇒ Object
Returns the value of attribute pending.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#wall_time ⇒ Object
Returns the value of attribute wall_time.
-
#worker_index ⇒ Object
readonly
Returns the value of attribute worker_index.
Class Method Summary collapse
-
.isolate_process_group(pid) ⇒ Object
EV-cnx8 / GH #1324: make the worker its own process-group leader so #kill can signal the whole subtree.
- .spawn(worker_index:, hooks:, &block) ⇒ Object
-
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …).
Instance Method Summary collapse
- #close_pipes ⇒ Object
-
#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker
constructor
A new instance of Worker.
-
#kill ⇒ Object
SIGKILL the worker’s whole process group (negative pid), reaping any grandchildren it forked.
- #kill_pid ⇒ Object
- #read_result ⇒ Object
- #reap ⇒ Object
- #res_io ⇒ Object
- #retire ⇒ Object
- #send_item(index, item) ⇒ Object
- #shutdown ⇒ Object
- #to_stat ⇒ Object
Constructor Details
#initialize(pid:, cmd_write:, res_read:, worker_index:) ⇒ Worker
Returns a new instance of Worker.
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 70 def initialize(pid:, cmd_write:, res_read:, worker_index:) @pid = pid @cmd_write = cmd_write @res_read = res_read @worker_index = worker_index @items_completed = 0 @pending = 0 @busy_time = 0.0 @wall_time = 0.0 @in_flight_indices = [] @deadline = nil end |
Instance Attribute Details
#busy_time ⇒ Object
Returns the value of attribute busy_time.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def busy_time @busy_time end |
#deadline ⇒ Object
Returns the value of attribute deadline.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def deadline @deadline end |
#in_flight_indices ⇒ Object (readonly)
Returns the value of attribute in_flight_indices.
12 13 14 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def in_flight_indices @in_flight_indices end |
#items_completed ⇒ Object
Returns the value of attribute items_completed.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def items_completed @items_completed end |
#pending ⇒ Object
Returns the value of attribute pending.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def pending @pending end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
12 13 14 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def pid @pid end |
#wall_time ⇒ Object
Returns the value of attribute wall_time.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def wall_time @wall_time end |
#worker_index ⇒ Object (readonly)
Returns the value of attribute worker_index.
12 13 14 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 12 def worker_index @worker_index end |
Class Method Details
.isolate_process_group(pid) ⇒ Object
EV-cnx8 / GH #1324: make the worker its own process-group leader so #kill can signal the whole subtree. A mutation’s spec may fork a grandchild that blocks (e.g. ConditionVariable#wait); when the dispatcher SIGKILLs a stuck worker, that grandchild must die with it rather than orphan to init holding memory/fds/connections. Done parent-side (before the child forks anything) so a failure is visible here instead of being swallowed in the child.
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 51 def self.isolate_process_group(pid) Process.setpgid(pid, pid) rescue Errno::EACCES, Errno::ESRCH # EACCES: child already exec'd/changed group; ESRCH: child already exited. # Both are benign -- reaping handles the child either way. nil rescue SystemCallError => e warn "evilution: could not isolate worker #{pid} into its own process " \ "group (#{e.class}: #{e.}); grandchildren may survive a kill." end |
.spawn(worker_index:, hooks:, &block) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 15 def self.spawn(worker_index:, hooks:, &block) cmd_read, cmd_write = IO.pipe res_read, res_write = IO.pipe [cmd_read, cmd_write, res_read, res_write].each(&:binmode) pid = Process.fork do cmd_write.close res_read.close ENV["TEST_ENV_NUMBER"] = test_env_number_for(worker_index) Evilution::ChildOutput.redirect! Loop.run(cmd_read, res_write, hooks: hooks, &block) end cmd_read.close res_write.close # Register BEFORE isolating so the trap can never observe a worker that is # already its own group leader yet missing from the registry (EV-jwao race, # GH #1333 review): the spawn runs on the same main thread the trap # interrupts, so a signal arriving between setpgid and register would # otherwise leak a leader the trap cannot reach. Ordering register first # leaves only safe windows -- pre-setpgid the child still shares the parent # group and receives the terminal signal directly; once it is its own # leader the registry already lists it. Registering unconditionally is safe # because signal_all's kill(-pid) is a no-op (Errno::ESRCH) for a pid that # never became a group leader (setpgid failed). Evilution::Parallel::WorkQueue::WorkerRegistry.register(pid) isolate_process_group(pid) new(pid:, cmd_write:, res_read:, worker_index:) end |
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …). Rails apps interpolating TEST_ENV_NUMBER into database.yml get per-worker SQLite files, avoiding lock contention under jobs > 1.
66 67 68 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 66 def self.test_env_number_for(worker_index) worker_index.zero? ? "" : (worker_index + 1).to_s end |
Instance Method Details
#close_pipes ⇒ Object
118 119 120 121 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 118 def close_pipes @cmd_write.close unless @cmd_write.closed? @res_read.close unless @res_read.closed? end |
#kill ⇒ Object
SIGKILL the worker’s whole process group (negative pid), reaping any grandchildren it forked. Falls back to the single pid if the group is gone – already reaped, or setpgid did not take in the child.
106 107 108 109 110 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 106 def kill Process.kill("KILL", -@pid) rescue Errno::ESRCH kill_pid end |
#kill_pid ⇒ Object
112 113 114 115 116 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 112 def kill_pid Process.kill("KILL", @pid) rescue Errno::ESRCH nil end |
#read_result ⇒ Object
93 94 95 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 93 def read_result Evilution::Parallel::WorkQueue::Channel.read(@res_read) end |
#reap ⇒ Object
123 124 125 126 127 128 129 130 131 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 123 def reap Process.wait(@pid) rescue Errno::ECHILD nil ensure # Drop the pgid once the leader is reaped so the trap never signals a group # whose pid the OS may have recycled. No-op if it was never registered. Evilution::Parallel::WorkQueue::WorkerRegistry.unregister(@pid) end |
#res_io ⇒ Object
83 84 85 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 83 def res_io @res_read end |
#retire ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 133 def retire shutdown timing = drain_stats close_pipes reap @busy_time = timing.busy @wall_time = timing.wall to_stat end |
#send_item(index, item) ⇒ Object
87 88 89 90 91 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 87 def send_item(index, item) Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, [index, item]) @pending += 1 @in_flight_indices << index end |
#shutdown ⇒ Object
97 98 99 100 101 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 97 def shutdown Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, Evilution::Parallel::WorkQueue::SHUTDOWN) rescue Errno::EPIPE nil end |
#to_stat ⇒ Object
143 144 145 146 147 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 143 def to_stat Evilution::Parallel::WorkQueue::WorkerStat.new( @pid, @items_completed, @busy_time || 0.0, @wall_time || 0.0 ) end |