Class: MainLoop::Loop

Inherits:
Object
  • Object
show all
Defined in:
lib/main_loop/loop.rb

Overview

MainLoop::Loop

Главный цикл управления, запускает обработку сигналов, обрабатывает события из Bus.

Жизненный цикл

  1. #run устанавливает #install_signal_handlers

  2. #start_loop_forever запускает цикл обработки событий

  3. События из Bus обрабатываются через case:

  4. Dispatcher#reap получает завершенные процессы

  5. Dispatcher#tick проверяет необходимость принудительного завершения

Пример использования

bus = MainLoop::Bus.new
dispatcher = MainLoop::Dispatcher.new(bus, timeout: 10)
loop = MainLoop::Loop.new(bus, dispatcher)

loop.run(30)  # запуск с таймаутом 30 секунд

См. также

  • Bus — канал событий

  • Dispatcher — координирует обработчики

  • ProcessHandler — обработчики процессов

  • ThreadHandler — обработчики потоков

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bus, dispatcher, logger: nil) ⇒ Loop

Инициализация

Parameters:

  • bus (Bus)

    канал событий

  • dispatcher (Dispatcher)

    диспетчер обработчиков

  • logger (Logger) (defaults to: nil)

    логгер (по умолчанию Logger.new(nil))



48
49
50
51
52
53
54
# File 'lib/main_loop/loop.rb', line 48

def initialize(bus, dispatcher, logger: nil)
  STDOUT.sync = true
  STDERR.sync = true
  @bus = bus
  @dispatcher = dispatcher
  @logger = logger || Logger.new(nil)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



41
42
43
# File 'lib/main_loop/loop.rb', line 41

def logger
  @logger
end

Instance Method Details

#crash(_command) ⇒ Object

Отправить сигнал аварийного завершения

Передает команду crash диспетчеру.

Parameters:

  • _command (String)

    команда (Unused)



159
160
161
# File 'lib/main_loop/loop.rb', line 159

def crash(_command)
  @dispatcher.crash
end

#install_signal_handlers(bus) ⇒ Object

Установка обработчиков сигналов

Устанавливает trap для TERM, INT и CLD. Сигналы отправляются в Bus через отдельные потоки.

:nocov:



116
117
118
119
120
121
122
123
124
125
126
# File 'lib/main_loop/loop.rb', line 116

def install_signal_handlers(bus)
  TERM_SIGNALS.each do |sig|
    trap(sig) do |*_args|
      Thread.new(bus) {|b| b.puts "sig:#{sig}" }
    end
  end

  trap 'CLD' do
    Thread.new(bus) {|b| b.puts 'sig:CLD' }
  end
end

#reap(command) ⇒ Object

Обработка завершения процесса

Парсит команду “reap:id:status” и отправляет в диспетчер.

Parameters:

  • command (String)

    команда вида “reap:id:status”



168
169
170
171
# File 'lib/main_loop/loop.rb', line 168

def reap(command)
  _, id, status = command.split(':')
  @dispatcher.reap_by_id(id, status)
end

#reap_childrenArray<Array>

Сбор завершенных процессов

Проходит по всем PID обработчиков и собирает их статусы через wait2. Дополнительно собирает все оставшиеся дочерние процессы (wait2(-1)).

Особенности обработки ECHILD

Если процесс завершился и был “съеден” другой системой (например, родительский процесс вызвал Process.wait в on_term обработчике), то Process.wait2(pid) вызовет Errno::ECHILD. Это нормальное поведение в Unix/Linux когда PID больше не существует в таблице процессов.

В этом случае:

  • Мы добавляем [pid, nil] в результат, чтобы отметить, что процесс не найден

  • Обработка продолжается для остальных процессов в списке

  • Это предотвращает “зависание” обработки всех остальных процессов

Пример сценария (см. test_process.rb):

  1. ProcessHandler запускает процесс с PID 123

  2. При терминации вызывается on_term(pid) в обработчике

  3. on_term вызывает Process.wait(pid) и “съедает” статус

  4. Позже reap_children пытается wait2(123) и получает ECHILD

  5. Обработка продолжается для других процессов, а 123 помечается как [123, nil]

Логика обработки

Метод проходит по каждому PID из @dispatcher.pids:

  • wait2(pid) возвращает [pid, status] если процесс найден

  • wait2(pid) возвращает nil если процесс еще не завершился

  • wait2(pid) вызывает Errno::ECHILD если PID уже не существует

Для каждого случая:

  • Нам возвращается [pid, status] -> добавляем в results

  • Возвращается nil -> ничего не добавляем (не завершился)

  • ECHILD -> добавляем [pid, nil] (PID не найден, съеден другой системой)

После обработки всех известных PID, делается wait2(-1) для сбора любых оставшихся дочерних процессов (с таймаутом 2 секунды).

Returns:

  • (Array<Array>)

    массив пар (pid, status)



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/main_loop/loop.rb', line 212

def reap_children
  results = []

  @dispatcher.pids.each do |pid|
    begin
      if (result = self.wait2(pid))
        results << result
      end
    rescue Errno::ECHILD
      # Процесс "съеден" другой системой (например, Process.wait вызван в on_term)
      # или процесс уже завершился и pid больше не существует
      # Добавляем [pid, nil] чтобы отметить его и продолжить обработку остальных
      results << [pid, nil]
    end
  end

  Timeouter.loop(2) do
    unless (result = self.wait2(-1))
      break
    end

    results << result
  end

  results
rescue Errno::ECHILD
  results
end

#run(timeout = 0) ⇒ Object

Запуск цикла

Устанавливает обработчики сигналов и запускает #start_loop_forever.

Parameters:

  • timeout (Numeric) (defaults to: 0)

    таймаут цикла в секундах (0 = бесконечный)

Raises:

  • (StandardError)

    если произошла ошибка в цикле



62
63
64
65
66
67
68
69
70
71
# File 'lib/main_loop/loop.rb', line 62

def run(timeout = 0)
  install_signal_handlers(@bus)

  start_loop_forever(timeout)
rescue StandardError => e
  # :nocov:
  logger.fatal("Exception in Main Loop: #{e.inspect}")
  exit!(2)
  # :nocov:
end

#signal(command) ⇒ Object

Обработка сигнала

Parameters:

  • command (String)

    команда вида “sig:NAME”



132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/main_loop/loop.rb', line 132

def signal(command)
  _, sig = command.split(':')
  logger.debug("signal:#{sig}")

  if TERM_SIGNALS.include?(sig)
    @dispatcher.term
  elsif sig == 'CLD'
    # nothing to do child will reap later
  else
    logger.info("unhandled signal:#{sig}")
  end
end

#start_loop_forever(timeout = 0) ⇒ Object

Главный цикл обработки событий

Цикл с ограниченным временем работы (через Timeouter).

Интервал ожидания событий:

wait = [[(timeout / 2.5), 5].min, 5].max

Минимум 5 секунд (даже при timeout = 0)

Parameters:

  • timeout (Numeric) (defaults to: 0)

    таймаут цикла в секундах (0 = бесконечный)



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/main_loop/loop.rb', line 82

def start_loop_forever(timeout = 0)
  # TODO поскольку wait всегда равен 5 секунд, то цикл работы 5 секунд, и потому 
  # timeout для Dispatcher нужно ставить больше 2 циклов, чтобы успели завершиться все потоки или процессы
  wait = [[(timeout / 2.5), 5].min, 5].max
  Timeouter.loop(timeout) do
    event = @bus.gets(wait)
    logger.debug("command:#{event}")

    case event
    when 'term'
      term(event)
    when 'crash'
      crash(event)
    when /sig:/
      signal(event)
    when /reap:/
      reap(event)
    when nil
      logger.debug('Empty event: reaping...')
    else
      logger.debug("unknown event:#{event}")
    end

    @dispatcher.reap(reap_children) rescue nil
    @dispatcher.tick
  end
end

#term(_command) ⇒ Object

Инициировать терминацию

Передает команду терминации диспетчеру (если не уже терминация).

Parameters:

  • _command (String)

    команда (Unused)



150
151
152
# File 'lib/main_loop/loop.rb', line 150

def term(_command)
  @dispatcher.term unless @dispatcher.terminating?
end

#wait2(pid) ⇒ Array<Integer, Process::Status>|nil

Ожидание завершения процесса

Обертка для Process.wait2 с флагом WNOHANG.

:nocov:

Parameters:

  • pid (Integer)

    PID процесса для ожидания

Returns:

  • (Array<Integer, Process::Status>|nil)

    пара (pid, status) или nil если нет завершенных



248
249
250
# File 'lib/main_loop/loop.rb', line 248

def wait2(pid)
  Process.wait2(pid, ::Process::WNOHANG)
end