Class: OpenC3::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
5.0- PROCESS_SHUTDOWN_SECONDS =
5.0- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/openc3/operators/operator.rb', line 235 def initialize Logger.level = Logger::INFO Logger.microservice_name = 'MicroserviceOperator' OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds # Maximum number of new microservices to start per cycle. This spreads a # large startup burst (e.g. installing a plugin with many targets) across # multiple cycles so the shared services (object store, redis) aren't # stampeded by every microservice connecting at once. 0 = no limit. @max_start_per_cycle = (ENV['OPENC3_OPERATOR_MAX_START_PER_CYCLE'] || 5).to_i @ruby_process_name = ENV['OPENC3_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
228 229 230 |
# File 'lib/openc3/operators/operator.rb', line 228 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
228 229 230 |
# File 'lib/openc3/operators/operator.rb', line 228 def processes @processes end |
Class Method Details
.instance ⇒ Object
416 417 418 |
# File 'lib/openc3/operators/operator.rb', line 416 def self.instance @@instance end |
.processes ⇒ Object
412 413 414 |
# File 'lib/openc3/operators/operator.rb', line 412 def self.processes @@instance.processes end |
.run ⇒ Object
407 408 409 410 |
# File 'lib/openc3/operators/operator.rb', line 407 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
308 309 310 311 312 313 314 315 316 |
# File 'lib/openc3/operators/operator.rb', line 308 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/openc3/operators/operator.rb', line 285 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 # Cycle at most @max_start_per_cycle changed microservices this cycle; # any remaining stay queued in @changed_processes and are cycled on # later cycles. This avoids a restart stampede when many microservices # change at once (e.g. a configmap change) overwhelming shared # services. Processes not yet cycled keep running until their turn. cycle_names = @changed_processes.keys cycle_names = cycle_names[0...@max_start_per_cycle] if @max_start_per_cycle > 0 cycle = @changed_processes.slice(*cycle_names) Logger.info("Cycling #{cycle.length} of #{@changed_processes.length} changed microservices...") shutdown_processes(cycle) break if @shutdown cycle_names.each do |name| @changed_processes[name].start @changed_processes.delete(name) end end end end |
#respawn_dead ⇒ Object
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/openc3/operators/operator.rb', line 318 def respawn_dead @mutex.synchronize do @processes.each do |name, p| break if @shutdown # Skip processes still queued by the per-cycle start limit; they # haven't been started yet so they aren't "dead" to be respawned. next if @new_processes[name] p.output_increment unless p.alive? # Respawn process output = p.extract_output Logger.error("Unexpected process died... respawning! #{p.cmd_line}\n#{output}\n", scope: p.scope) p.hard_stop p.start end end end end |
#run ⇒ Object
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'lib/openc3/operators/operator.rb', line 377 def run # Use at_exit to shutdown cleanly at_exit { shutdown() } # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(0.1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown ⇒ Object
367 368 369 370 371 372 373 374 375 |
# File 'lib/openc3/operators/operator.rb', line 367 def shutdown @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) Logger.info("Shutting down processes complete") @shutdown_complete = true end end |
#shutdown_processes(processes) ⇒ Object
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/openc3/operators/operator.rb', line 337 def shutdown_processes(processes) # Make a copy so we don't mutate original hard_stop_processes = processes.dup processes = processes.dup Logger.info("Commanding soft stops...") processes.each { |_name, p| p.soft_stop } start_time = Time.now # Allow sufficient time for processes to shutdown cleanly while (Time.now - start_time) < PROCESS_SHUTDOWN_SECONDS processes_to_remove = [] processes.each do |name, p| unless p.alive? processes_to_remove << name Logger.debug("Soft stop process successful: #{p.cmd_line}", scope: p.scope) end end processes_to_remove.each do |name| processes.delete(name) end if processes.length <= 0 Logger.debug("Soft stop all successful") break end sleep(0.1) end Logger.debug("Commanding hard stops...") hard_stop_processes.each { |_name, p| p.output_increment; p.extract_output; p.hard_stop } end |
#start_new ⇒ Object
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/openc3/operators/operator.rb', line 267 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start at most @max_start_per_cycle processes this cycle; any # remaining stay queued in @new_processes and start on later cycles. # This avoids a startup stampede when many microservices appear at # once (e.g. a plugin install) overwhelming the object store / redis. start_names = @new_processes.keys start_names = start_names[0...@max_start_per_cycle] if @max_start_per_cycle > 0 Logger.info("#{self.class} starting #{start_names.length} of #{@new_processes.length} new process(es)...") start_names.each do |name| @new_processes[name].start @new_processes.delete(name) end end end end |
#stop ⇒ Object
403 404 405 |
# File 'lib/openc3/operators/operator.rb', line 403 def stop @shutdown = true end |
#update ⇒ Object
263 264 265 |
# File 'lib/openc3/operators/operator.rb', line 263 def update raise "Implement in subclass" end |