Class: MainLoop::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/main_loop/dispatcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bus, timeout: 5, logger: nil) ⇒ Dispatcher

Инициализация

Parameters:

  • bus (Bus)

    канал событий

  • timeout (Integer) (defaults to: 5)

    таймаут для принудительного завершения в секундах (по умолчанию 5)

  • logger (Logger) (defaults to: nil)

    логгер (по умолчанию Logger.new(nil))

Options Hash (bus):

Options Hash (timeout:):

  • 5 (Integer)

Options Hash (logger:):

  • nil (Logger)


48
49
50
51
52
53
54
55
# File 'lib/main_loop/dispatcher.rb', line 48

def initialize(bus, timeout: 5, logger: nil)
  super()
  @bus = bus
  @timeout = timeout
  @handlers = []
  @logger = logger || Logger.new(nil)
  @exit_code = 0
end

Instance Attribute Details

#busObject (readonly)

Returns the value of attribute bus.



38
39
40
# File 'lib/main_loop/dispatcher.rb', line 38

def bus
  @bus
end

#handlersObject (readonly)

Returns the value of attribute handlers.



38
39
40
# File 'lib/main_loop/dispatcher.rb', line 38

def handlers
  @handlers
end

#loggerObject (readonly)

Returns the value of attribute logger.



38
39
40
# File 'lib/main_loop/dispatcher.rb', line 38

def logger
  @logger
end

Instance Method Details

#add_handler(handler) ⇒ Object

Регистрация обработчика

Добавляет обработчик в список. Если уже происходит терминация, сразу посылает ‘term` новому обработчику.

Parameters:

  • handler (Handler)

    обработчик для регистрации



93
94
95
96
97
98
# File 'lib/main_loop/dispatcher.rb', line 93

def add_handler(handler)
  synchronize do
    handler.term if terminating?
    handlers << handler
  end
end

#crashObject

Отправить сигнал аварийного завершения

Устанавливает код выхода 3 и инициирует терминацию. Если уже терминация — ничего не делает.



135
136
137
138
# File 'lib/main_loop/dispatcher.rb', line 135

def crash
  @exit_code = 3
  term unless terminating?
end

#log_statusObject

Логировать статус

Логирует текущее состояние обработчиков (DEBUG уровень). Формат: “Total:N Running:M Finished:K. Success:[] TERM”

:nocov:



200
201
202
203
204
205
206
# File 'lib/main_loop/dispatcher.rb', line 200

def log_status
  total = handlers.size
  running = handlers.count(&:running?)
  finihsed = handlers.count(&:finished?)
  term_text = terminating? ? 'TERM' : ''
  logger.debug("Total:#{total} Running:#{running} Finished:#{finihsed} Success:#{handlers.map {|h| h.success?}.to_s}. #{term_text}".strip)
end

#need_force_kill?Boolean

Проверка необходимости принудительного завершения

Проверяет, превышен ли timeout с момента начала терминации.

Returns:

  • (Boolean)

    true если timeout превышен



162
163
164
# File 'lib/main_loop/dispatcher.rb', line 162

def need_force_kill?
  @terminating_at && (Time.now - @terminating_at) >= @timeout
end

#pidsArray<Integer>

Получить список PID процессов

Returns:

  • (Array<Integer>)

    массив PID всех процессовых обработчиков



169
170
171
# File 'lib/main_loop/dispatcher.rb', line 169

def pids
  handlers.map{|h| h.pid rescue nil }.compact
end

#reap(statuses) ⇒ Object

Обработка завершения процессов

Обрабатывает массив завершенных процессов.

Examples:

dispatcher.reap([[123, status], [456, nil]])

Parameters:

  • statuses (Array<Array>)

    массив пар (pid, status)



64
65
66
67
68
# File 'lib/main_loop/dispatcher.rb', line 64

def reap(statuses)
  statuses.each do |(pid, status)|
    reap_by_id(pid, status)
  end
end

#reap_by_id(id, status) ⇒ Object

Обработка завершения процесса по ID

Находит обработчика по ID и вызывает его Handler#reap.

Parameters:

  • id (String)

    идентификатор обработчика (pid для процессов, object_id для потоков)

  • status (Process::Status|nil)

    статус завершения или nil если неизвестен



76
77
78
79
80
81
82
83
84
85
# File 'lib/main_loop/dispatcher.rb', line 76

def reap_by_id(id, status)
  synchronize do
    if (handler = handlers.find {|h| h.id == id })
      logger.info("Reap handler #{handler.name.inspect}. Status: #{status&.inspect}")
      handler.reap(status)
    else
      logger.debug("Reap unknown handler. Status: #{status&.inspect}. Skipped")
    end
  end
end

#termObject

Инициировать терминацию

Отправляет сигнал терминации всем обработчикам. Если уже в процессе терминации — принудительное завершение (kill).

Если это первый вызов:

  • Устанавливает @terminating_at = Time.now

  • Отправляет term каждому обработчику

Если уже терминация:

  • Отправляет kill каждому обработчику



118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/main_loop/dispatcher.rb', line 118

def term
  synchronize do
    if terminating?
      logger.info('Terminate FORCE all handlers')
      handlers.each(&:kill)
    else
      @terminating_at ||= Time.now
      logger.info('Terminate all handlers')
      handlers.each(&:term)
    end
  end
end

#terminating?Time|nil

Проверка терминации

Returns:

  • (Time|nil)

    момент начала терминации или nil если не терминация



103
104
105
# File 'lib/main_loop/dispatcher.rb', line 103

def terminating?
  @terminating_at
end

#tickObject

Тик цикла диспетчера

Вызывается в каждом цикле MainLoop#start_loop_forever. Проверяет необходимость принудительного завершения по timeout.



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/main_loop/dispatcher.rb', line 144

def tick
  log_status if logger.debug?
  return unless terminating?

  try_exit!

  return if @killed || !need_force_kill?

  @killed = true
  logger.info('Killing all handlers by timeout')
  handlers.each(&:kill)
end

#try_exit!Object

Завершить программу

Если все обработчики завершены, вызывает exit с соответствующим кодом.

Код выхода:

  • @exit_code если все обработчики завершились успешно

  • 1 если любой обработчик завершился с ошибкой

:nocov:



182
183
184
185
186
187
188
189
190
191
# File 'lib/main_loop/dispatcher.rb', line 182

def try_exit!
  synchronize do
    return unless handlers.all?(&:finished?)

    logger.info('All handlers finished exiting...')
    status = handlers.all?(&:success?) ? @exit_code : 1
    logger.info("Exit: #{status}")
    exit status
  end
end