Class: Evilution::Parallel::WorkQueue::Worker
- Inherits:
-
Object
- Object
- Evilution::Parallel::WorkQueue::Worker
- Defined in:
- lib/evilution/parallel/work_queue/worker.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#busy_time ⇒ Object
Returns the value of attribute busy_time.
-
#deadline ⇒ Object
Returns the value of attribute deadline.
-
#in_flight_indices ⇒ Object
readonly
Returns the value of attribute in_flight_indices.
-
#items_completed ⇒ Object
Returns the value of attribute items_completed.
-
#pending ⇒ Object
Returns the value of attribute pending.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#wall_time ⇒ Object
Returns the value of attribute wall_time.
-
#worker_index ⇒ Object
readonly
Returns the value of attribute worker_index.
Class Method Summary collapse
-
.install_child_signal_handlers ⇒ Object
EV-7a91: a worker is the parent of the inner per-mutation Fork children it spawns, and those children are their own process-group leaders (EV-2sh8), so the Runner’s group-kill of the worker never reaches them.
-
.spawn(worker_index:, hooks:, supervisor: Evilution::ProcessSupervisor.new, &block) ⇒ Object
EV-dg69 / EV-5rrh step 3: the supervisor owns the worker’s process-group isolation, signal-safe registry, group-kill and reap.
-
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …).
Instance Method Summary collapse
- #close_pipes ⇒ Object
-
#initialize(handle:, supervisor:, cmd_write:, res_read:, worker_index:) ⇒ Worker
constructor
A new instance of Worker.
-
#kill ⇒ Object
SIGKILL the worker’s whole process group (negative pid), reaping any grandchildren it forked, with the bare pid as a fallback for the case where the group is gone (already reaped, or setpgid did not take).
- #read_result ⇒ Object
-
#reap ⇒ Object
Reap the leader and drop it from the registry so the trap never signals a group whose pid the OS may have recycled.
- #res_io ⇒ Object
- #retire ⇒ Object
- #send_item(index, item) ⇒ Object
- #shutdown ⇒ Object
- #to_stat ⇒ Object
Constructor Details
#initialize(handle:, supervisor:, cmd_write:, res_read:, worker_index:) ⇒ Worker
Returns a new instance of Worker.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 69 def initialize(handle:, supervisor:, cmd_write:, res_read:, worker_index:) @handle = handle @supervisor = supervisor @pid = handle.pid @cmd_write = cmd_write @res_read = res_read @worker_index = worker_index @items_completed = 0 @pending = 0 @busy_time = 0.0 @wall_time = 0.0 @in_flight_indices = [] @deadline = nil end |
Instance Attribute Details
#busy_time ⇒ Object
Returns the value of attribute busy_time.
14 15 16 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14 def busy_time @busy_time end |
#deadline ⇒ Object
Returns the value of attribute deadline.
14 15 16 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14 def deadline @deadline end |
#in_flight_indices ⇒ Object (readonly)
Returns the value of attribute in_flight_indices.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def in_flight_indices @in_flight_indices end |
#items_completed ⇒ Object
Returns the value of attribute items_completed.
14 15 16 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14 def items_completed @items_completed end |
#pending ⇒ Object
Returns the value of attribute pending.
14 15 16 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14 def pending @pending end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def pid @pid end |
#wall_time ⇒ Object
Returns the value of attribute wall_time.
14 15 16 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14 def wall_time @wall_time end |
#worker_index ⇒ Object (readonly)
Returns the value of attribute worker_index.
13 14 15 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13 def worker_index @worker_index end |
Class Method Details
.install_child_signal_handlers ⇒ Object
EV-7a91: a worker is the parent of the inner per-mutation Fork children it spawns, and those children are their own process-group leaders (EV-2sh8), so the Runner’s group-kill of the worker never reaches them. On a terminal INT/TERM the worker must therefore tear down AND reap the inner children it owns before it dies, or they survive as zombies (their parent gone) until an ancestor exits. cleanup_all clears any per-mutation sandbox dirs the inner children registered in this worker’s TempDirTracker.
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 50 def self.install_child_signal_handlers %w[INT TERM].each do |sig| Signal.trap(sig) do Evilution::TempDirTracker.cleanup_all Evilution::ProcessSupervisor.kill_and_reap_all Signal.trap(sig, "DEFAULT") Process.kill(sig, Process.pid) end end end |
.spawn(worker_index:, hooks:, supervisor: Evilution::ProcessSupervisor.new, &block) ⇒ Object
EV-dg69 / EV-5rrh step 3: the supervisor owns the worker’s process-group isolation, signal-safe registry, group-kill and reap. spawn passes isolate_in_child: false so the worker becomes its own group leader only parent-side, AFTER the supervisor has registered it – preserving the EV-jwao register-before-isolate ordering (the trap can never see a leader missing from the registry). EV-cnx8 group-leadership (so #kill sweeps the whole subtree) is still established, now by the supervisor’s parent-side setpgid.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 24 def self.spawn(worker_index:, hooks:, supervisor: Evilution::ProcessSupervisor.new, &block) cmd_read, cmd_write = IO.pipe res_read, res_write = IO.pipe [cmd_read, cmd_write, res_read, res_write].each(&:binmode) handle = supervisor.spawn(isolate_in_child: false) do cmd_write.close res_read.close install_child_signal_handlers ENV["TEST_ENV_NUMBER"] = test_env_number_for(worker_index) Evilution::ChildOutput.redirect! Loop.run(cmd_read, res_write, hooks: hooks, &block) end cmd_read.close res_write.close new(handle:, supervisor:, cmd_write:, res_read:, worker_index:) end |
.test_env_number_for(worker_index) ⇒ Object
EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …). Rails apps interpolating TEST_ENV_NUMBER into database.yml get per-worker SQLite files, avoiding lock contention under jobs > 1.
65 66 67 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 65 def self.test_env_number_for(worker_index) worker_index.zero? ? "" : (worker_index + 1).to_s end |
Instance Method Details
#close_pipes ⇒ Object
111 112 113 114 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 111 def close_pipes @cmd_write.close unless @cmd_write.closed? @res_read.close unless @res_read.closed? end |
#kill ⇒ Object
SIGKILL the worker’s whole process group (negative pid), reaping any grandchildren it forked, with the bare pid as a fallback for the case where the group is gone (already reaped, or setpgid did not take).
107 108 109 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 107 def kill @supervisor.signal_group("KILL", @handle) end |
#read_result ⇒ Object
94 95 96 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 94 def read_result Evilution::Parallel::WorkQueue::Channel.read(@res_read) end |
#reap ⇒ Object
Reap the leader and drop it from the registry so the trap never signals a group whose pid the OS may have recycled. ECHILD-tolerant; unregister is a no-op if it was never registered.
119 120 121 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 119 def reap @supervisor.reap(@handle) end |
#res_io ⇒ Object
84 85 86 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 84 def res_io @res_read end |
#retire ⇒ Object
123 124 125 126 127 128 129 130 131 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 123 def retire shutdown timing = drain_stats close_pipes reap @busy_time = timing.busy @wall_time = timing.wall to_stat end |
#send_item(index, item) ⇒ Object
88 89 90 91 92 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 88 def send_item(index, item) Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, [index, item]) @pending += 1 @in_flight_indices << index end |
#shutdown ⇒ Object
98 99 100 101 102 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 98 def shutdown Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, Evilution::Parallel::WorkQueue::SHUTDOWN) rescue Errno::EPIPE nil end |
#to_stat ⇒ Object
133 134 135 136 137 |
# File 'lib/evilution/parallel/work_queue/worker.rb', line 133 def to_stat Evilution::Parallel::WorkQueue::WorkerStat.new( @pid, @items_completed, @busy_time || 0.0, @wall_time || 0.0 ) end |