Class: MainLoop::Loop
- Inherits:
-
Object
- Object
- MainLoop::Loop
- Defined in:
- lib/main_loop/loop.rb
Overview
MainLoop::Loop
Главный цикл управления, запускает обработку сигналов, обрабатывает события из Bus.
Жизненный цикл
-
#run устанавливает #install_signal_handlers
-
#start_loop_forever запускает цикл обработки событий
-
События из Bus обрабатываются через case:
-
Dispatcher#reap получает завершенные процессы
-
Dispatcher#tick проверяет необходимость принудительного завершения
Пример использования
bus = MainLoop::Bus.new
dispatcher = MainLoop::Dispatcher.new(bus, timeout: 10)
loop = MainLoop::Loop.new(bus, dispatcher)
loop.run(30) # запуск с таймаутом 30 секунд
См. также
-
Bus — канал событий
-
Dispatcher — координирует обработчики
-
ProcessHandler — обработчики процессов
-
ThreadHandler — обработчики потоков
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#crash(_command) ⇒ Object
Отправить сигнал аварийного завершения.
-
#initialize(bus, dispatcher, logger: nil) ⇒ Loop
constructor
Инициализация.
-
#install_signal_handlers(bus) ⇒ Object
Установка обработчиков сигналов.
-
#reap(command) ⇒ Object
Обработка завершения процесса.
-
#reap_children ⇒ Array<Array>
Сбор завершенных процессов.
-
#run(timeout = 0) ⇒ Object
Запуск цикла.
-
#signal(command) ⇒ Object
Обработка сигнала.
-
#start_loop_forever(timeout = 0) ⇒ Object
Главный цикл обработки событий.
-
#term(_command) ⇒ Object
Инициировать терминацию.
-
#wait2(pid) ⇒ Array<Integer, Process::Status>|nil
Ожидание завершения процесса.
Constructor Details
#initialize(bus, dispatcher, logger: nil) ⇒ Loop
Инициализация
48 49 50 51 52 53 54 |
# File 'lib/main_loop/loop.rb', line 48 def initialize(bus, dispatcher, logger: nil) STDOUT.sync = true STDERR.sync = true @bus = bus @dispatcher = dispatcher @logger = logger || Logger.new(nil) end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
41 42 43 |
# File 'lib/main_loop/loop.rb', line 41 def logger @logger end |
Instance Method Details
#crash(_command) ⇒ Object
Отправить сигнал аварийного завершения
Передает команду crash диспетчеру.
159 160 161 |
# File 'lib/main_loop/loop.rb', line 159 def crash(_command) @dispatcher.crash end |
#install_signal_handlers(bus) ⇒ Object
Установка обработчиков сигналов
Устанавливает trap для TERM, INT и CLD. Сигналы отправляются в Bus через отдельные потоки.
:nocov:
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/main_loop/loop.rb', line 116 def install_signal_handlers(bus) TERM_SIGNALS.each do |sig| trap(sig) do |*_args| Thread.new(bus) {|b| b.puts "sig:#{sig}" } end end trap 'CLD' do Thread.new(bus) {|b| b.puts 'sig:CLD' } end end |
#reap(command) ⇒ Object
Обработка завершения процесса
Парсит команду “reap:id:status” и отправляет в диспетчер.
168 169 170 171 |
# File 'lib/main_loop/loop.rb', line 168 def reap(command) _, id, status = command.split(':') @dispatcher.reap_by_id(id, status) end |
#reap_children ⇒ Array<Array>
Сбор завершенных процессов
Проходит по всем PID обработчиков и собирает их статусы через wait2. Дополнительно собирает все оставшиеся дочерние процессы (wait2(-1)).
Особенности обработки ECHILD
Если процесс завершился и был “съеден” другой системой (например, родительский процесс вызвал Process.wait в on_term обработчике), то Process.wait2(pid) вызовет Errno::ECHILD. Это нормальное поведение в Unix/Linux когда PID больше не существует в таблице процессов.
В этом случае:
-
Мы добавляем [pid, nil] в результат, чтобы отметить, что процесс не найден
-
Обработка продолжается для остальных процессов в списке
-
Это предотвращает “зависание” обработки всех остальных процессов
Пример сценария (см. test_process.rb):
-
ProcessHandler запускает процесс с PID 123
-
При терминации вызывается on_term(pid) в обработчике
-
on_term вызывает Process.wait(pid) и “съедает” статус
-
Позже reap_children пытается wait2(123) и получает ECHILD
-
Обработка продолжается для других процессов, а 123 помечается как [123, nil]
Логика обработки
Метод проходит по каждому PID из @dispatcher.pids:
-
wait2(pid) возвращает [pid, status] если процесс найден
-
wait2(pid) возвращает nil если процесс еще не завершился
-
wait2(pid) вызывает Errno::ECHILD если PID уже не существует
Для каждого случая:
-
Нам возвращается [pid, status] -> добавляем в results
-
Возвращается nil -> ничего не добавляем (не завершился)
-
ECHILD -> добавляем [pid, nil] (PID не найден, съеден другой системой)
После обработки всех известных PID, делается wait2(-1) для сбора любых оставшихся дочерних процессов (с таймаутом 2 секунды).
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/main_loop/loop.rb', line 212 def reap_children results = [] @dispatcher.pids.each do |pid| begin if (result = self.wait2(pid)) results << result end rescue Errno::ECHILD # Процесс "съеден" другой системой (например, Process.wait вызван в on_term) # или процесс уже завершился и pid больше не существует # Добавляем [pid, nil] чтобы отметить его и продолжить обработку остальных results << [pid, nil] end end Timeouter.loop(2) do unless (result = self.wait2(-1)) break end results << result end results rescue Errno::ECHILD results end |
#run(timeout = 0) ⇒ Object
Запуск цикла
Устанавливает обработчики сигналов и запускает #start_loop_forever.
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/main_loop/loop.rb', line 62 def run(timeout = 0) install_signal_handlers(@bus) start_loop_forever(timeout) rescue StandardError => e # :nocov: logger.fatal("Exception in Main Loop: #{e.inspect}") exit!(2) # :nocov: end |
#signal(command) ⇒ Object
Обработка сигнала
132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/main_loop/loop.rb', line 132 def signal(command) _, sig = command.split(':') logger.debug("signal:#{sig}") if TERM_SIGNALS.include?(sig) @dispatcher.term elsif sig == 'CLD' # nothing to do child will reap later else logger.info("unhandled signal:#{sig}") end end |
#start_loop_forever(timeout = 0) ⇒ Object
Главный цикл обработки событий
Цикл с ограниченным временем работы (через Timeouter).
Интервал ожидания событий:
wait = [[(timeout / 2.5), 5].min, 5].max
Минимум 5 секунд (даже при timeout = 0)
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/main_loop/loop.rb', line 82 def start_loop_forever(timeout = 0) # TODO поскольку wait всегда равен 5 секунд, то цикл работы 5 секунд, и потому # timeout для Dispatcher нужно ставить больше 2 циклов, чтобы успели завершиться все потоки или процессы wait = [[(timeout / 2.5), 5].min, 5].max Timeouter.loop(timeout) do event = @bus.gets(wait) logger.debug("command:#{event}") case event when 'term' term(event) when 'crash' crash(event) when /sig:/ signal(event) when /reap:/ reap(event) when nil logger.debug('Empty event: reaping...') else logger.debug("unknown event:#{event}") end @dispatcher.reap(reap_children) rescue nil @dispatcher.tick end end |
#term(_command) ⇒ Object
Инициировать терминацию
Передает команду терминации диспетчеру (если не уже терминация).
150 151 152 |
# File 'lib/main_loop/loop.rb', line 150 def term(_command) @dispatcher.term unless @dispatcher.terminating? end |
#wait2(pid) ⇒ Array<Integer, Process::Status>|nil
Ожидание завершения процесса
Обертка для Process.wait2 с флагом WNOHANG.
:nocov:
248 249 250 |
# File 'lib/main_loop/loop.rb', line 248 def wait2(pid) Process.wait2(pid, ::Process::WNOHANG) end |