Class: Evilution::Parallel::WorkQueue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/evilution/parallel/work_queue/worker.rb

Defined Under Namespace

Modules: Loop Classes: Timing

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(handle:, supervisor:, cmd_write:, res_read:, worker_index:) ⇒ Worker

Returns a new instance of Worker.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/evilution/parallel/work_queue/worker.rb', line 69

def initialize(handle:, supervisor:, cmd_write:, res_read:, worker_index:)
  @handle = handle
  @supervisor = supervisor
  @pid = handle.pid
  @cmd_write = cmd_write
  @res_read = res_read
  @worker_index = worker_index
  @items_completed = 0
  @pending = 0
  @busy_time = 0.0
  @wall_time = 0.0
  @in_flight_indices = []
  @deadline = nil
end

Instance Attribute Details

#busy_timeObject

Returns the value of attribute busy_time.



14
15
16
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14

def busy_time
  @busy_time
end

#deadlineObject

Returns the value of attribute deadline.



14
15
16
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14

def deadline
  @deadline
end

#in_flight_indicesObject (readonly)

Returns the value of attribute in_flight_indices.



13
14
15
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13

def in_flight_indices
  @in_flight_indices
end

#items_completedObject

Returns the value of attribute items_completed.



14
15
16
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14

def items_completed
  @items_completed
end

#pendingObject

Returns the value of attribute pending.



14
15
16
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14

def pending
  @pending
end

#pidObject (readonly)

Returns the value of attribute pid.



13
14
15
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13

def pid
  @pid
end

#wall_timeObject

Returns the value of attribute wall_time.



14
15
16
# File 'lib/evilution/parallel/work_queue/worker.rb', line 14

def wall_time
  @wall_time
end

#worker_indexObject (readonly)

Returns the value of attribute worker_index.



13
14
15
# File 'lib/evilution/parallel/work_queue/worker.rb', line 13

def worker_index
  @worker_index
end

Class Method Details

.install_child_signal_handlersObject

EV-7a91: a worker is the parent of the inner per-mutation Fork children it spawns, and those children are their own process-group leaders (EV-2sh8), so the Runner’s group-kill of the worker never reaches them. On a terminal INT/TERM the worker must therefore tear down AND reap the inner children it owns before it dies, or they survive as zombies (their parent gone) until an ancestor exits. cleanup_all clears any per-mutation sandbox dirs the inner children registered in this worker’s TempDirTracker.



50
51
52
53
54
55
56
57
58
59
# File 'lib/evilution/parallel/work_queue/worker.rb', line 50

def self.install_child_signal_handlers
  %w[INT TERM].each do |sig|
    Signal.trap(sig) do
      Evilution::TempDirTracker.cleanup_all
      Evilution::ProcessSupervisor.kill_and_reap_all
      Signal.trap(sig, "DEFAULT")
      Process.kill(sig, Process.pid)
    end
  end
end

.spawn(worker_index:, hooks:, supervisor: Evilution::ProcessSupervisor.new, &block) ⇒ Object

EV-dg69 / EV-5rrh step 3: the supervisor owns the worker’s process-group isolation, signal-safe registry, group-kill and reap. spawn passes isolate_in_child: false so the worker becomes its own group leader only parent-side, AFTER the supervisor has registered it – preserving the EV-jwao register-before-isolate ordering (the trap can never see a leader missing from the registry). EV-cnx8 group-leadership (so #kill sweeps the whole subtree) is still established, now by the supervisor’s parent-side setpgid.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/evilution/parallel/work_queue/worker.rb', line 24

def self.spawn(worker_index:, hooks:, supervisor: Evilution::ProcessSupervisor.new, &block)
  cmd_read, cmd_write = IO.pipe
  res_read, res_write = IO.pipe
  [cmd_read, cmd_write, res_read, res_write].each(&:binmode)

  handle = supervisor.spawn(isolate_in_child: false) do
    cmd_write.close
    res_read.close
    install_child_signal_handlers
    ENV["TEST_ENV_NUMBER"] = test_env_number_for(worker_index)
    Evilution::ChildOutput.redirect!
    Loop.run(cmd_read, res_write, hooks: hooks, &block)
  end

  cmd_read.close
  res_write.close
  new(handle:, supervisor:, cmd_write:, res_read:, worker_index:)
end

.test_env_number_for(worker_index) ⇒ Object

EV-kdns / GH #817: translate 0-based worker slot to parallel_tests’ TEST_ENV_NUMBER convention (“” for slot 0, “2” for slot 1, …). Rails apps interpolating TEST_ENV_NUMBER into database.yml get per-worker SQLite files, avoiding lock contention under jobs > 1.



65
66
67
# File 'lib/evilution/parallel/work_queue/worker.rb', line 65

def self.test_env_number_for(worker_index)
  worker_index.zero? ? "" : (worker_index + 1).to_s
end

Instance Method Details

#close_pipesObject



111
112
113
114
# File 'lib/evilution/parallel/work_queue/worker.rb', line 111

def close_pipes
  @cmd_write.close unless @cmd_write.closed?
  @res_read.close unless @res_read.closed?
end

#killObject

SIGKILL the worker’s whole process group (negative pid), reaping any grandchildren it forked, with the bare pid as a fallback for the case where the group is gone (already reaped, or setpgid did not take).



107
108
109
# File 'lib/evilution/parallel/work_queue/worker.rb', line 107

def kill
  @supervisor.signal_group("KILL", @handle)
end

#read_resultObject



94
95
96
# File 'lib/evilution/parallel/work_queue/worker.rb', line 94

def read_result
  Evilution::Parallel::WorkQueue::Channel.read(@res_read)
end

#reapObject

Reap the leader and drop it from the registry so the trap never signals a group whose pid the OS may have recycled. ECHILD-tolerant; unregister is a no-op if it was never registered.



119
120
121
# File 'lib/evilution/parallel/work_queue/worker.rb', line 119

def reap
  @supervisor.reap(@handle)
end

#res_ioObject



84
85
86
# File 'lib/evilution/parallel/work_queue/worker.rb', line 84

def res_io
  @res_read
end

#retireObject



123
124
125
126
127
128
129
130
131
# File 'lib/evilution/parallel/work_queue/worker.rb', line 123

def retire
  shutdown
  timing = drain_stats
  close_pipes
  reap
  @busy_time = timing.busy
  @wall_time = timing.wall
  to_stat
end

#send_item(index, item) ⇒ Object



88
89
90
91
92
# File 'lib/evilution/parallel/work_queue/worker.rb', line 88

def send_item(index, item)
  Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, [index, item])
  @pending += 1
  @in_flight_indices << index
end

#shutdownObject



98
99
100
101
102
# File 'lib/evilution/parallel/work_queue/worker.rb', line 98

def shutdown
  Evilution::Parallel::WorkQueue::Channel.write(@cmd_write, Evilution::Parallel::WorkQueue::SHUTDOWN)
rescue Errno::EPIPE
  nil
end

#to_statObject



133
134
135
136
137
# File 'lib/evilution/parallel/work_queue/worker.rb', line 133

def to_stat
  Evilution::Parallel::WorkQueue::WorkerStat.new(
    @pid, @items_completed, @busy_time || 0.0, @wall_time || 0.0
  )
end