Class: Cosmo::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/cosmo/engine.rb

Constant Summary collapse

PROCESSORS =
{
  jobs: Job::Processor,
  streams: Stream::Processor
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEngine

Returns a new instance of Engine.



20
21
22
23
24
# File 'lib/cosmo/engine.rb', line 20

def initialize
  @concurrency = Config.fetch(:concurrency, 1)
  @pool = Utils::ThreadPool.new(@concurrency)
  @running = Concurrent::AtomicBoolean.new
end

Class Method Details

.instanceObject



16
17
18
# File 'lib/cosmo/engine.rb', line 16

def self.instance
  @instance ||= new
end

.runObject



12
13
14
# File 'lib/cosmo/engine.rb', line 12

def self.run(...)
  instance.run(...)
end

Instance Method Details

#run(type, options) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/cosmo/engine.rb', line 26

def run(type, options)
  handler = Utils::Signal.trap(:INT, :TERM)
  Logger.info "Starting processing, hit Ctrl-C to stop"

  processor_classes = type && PROCESSORS.key?(type.to_sym) ? [PROCESSORS[type.to_sym]] : PROCESSORS.values
  @processors = processor_classes.map { _1.run(@pool, @running, options) }
  if @running.false?
    Logger.warn "Shutting down... (No processors are running)"
    return
  end

  signal = handler.wait
  Logger.info "Shutting down... (#{signal} received)"
  shutdown
end

#shutdownObject



42
43
44
45
46
47
48
# File 'lib/cosmo/engine.rb', line 42

def shutdown
  @running.make_false
  @pool.shutdown
  Logger.info "Pausing to allow jobs to finish..."
  @pool.wait_for_termination(Config[:timeout])
  Logger.info "Bye!"
end