Class: Async::Background::Runner
- Inherits:
-
Object
- Object
- Async::Background::Runner
- Includes:
- Clock
- Defined in:
- lib/async/background/runner.rb
Instance Attribute Summary collapse
-
#heap ⇒ Object
readonly
Returns the value of attribute heap.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#metrics ⇒ Object
readonly
Returns the value of attribute metrics.
-
#queue_store ⇒ Object
readonly
Returns the value of attribute queue_store.
-
#semaphore ⇒ Object
readonly
Returns the value of attribute semaphore.
-
#shutdown ⇒ Object
readonly
Returns the value of attribute shutdown.
-
#total_workers ⇒ Object
readonly
Returns the value of attribute total_workers.
-
#worker_index ⇒ Object
readonly
Returns the value of attribute worker_index.
Instance Method Summary collapse
-
#initialize(config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true) ⇒ Runner
constructor
A new instance of Runner.
- #run ⇒ Object
- #running? ⇒ Boolean
- #stop ⇒ Object
Constructor Details
#initialize(config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true) ⇒ Runner
Returns a new instance of Runner.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/async/background/runner.rb', line 20 def initialize( config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true ) @logger = Console.logger @worker_index = worker_index @total_workers = total_workers @running = true @shutdown = ::Async::Condition.new @metrics = Metrics.new(worker_index: worker_index, total_workers: total_workers) logger.info { "Async::Background worker_index=#{worker_index}/#{total_workers}, job_count=#{job_count}" } @semaphore = ::Async::Semaphore.new(job_count) @heap = build_heap(config_path) setup_queue(queue_socket_dir, queue_db_path, queue_mmap) end |
Instance Attribute Details
#heap ⇒ Object (readonly)
Returns the value of attribute heap.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def heap @heap end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def logger @logger end |
#metrics ⇒ Object (readonly)
Returns the value of attribute metrics.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def metrics @metrics end |
#queue_store ⇒ Object (readonly)
Returns the value of attribute queue_store.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def queue_store @queue_store end |
#semaphore ⇒ Object (readonly)
Returns the value of attribute semaphore.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def semaphore @semaphore end |
#shutdown ⇒ Object (readonly)
Returns the value of attribute shutdown.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def shutdown @shutdown end |
#total_workers ⇒ Object (readonly)
Returns the value of attribute total_workers.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def total_workers @total_workers end |
#worker_index ⇒ Object (readonly)
Returns the value of attribute worker_index.
18 19 20 |
# File 'lib/async/background/runner.rb', line 18 def worker_index @worker_index end |
Instance Method Details
#run ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/async/background/runner.rb', line 39 def run Async do |task| setup_signal_handlers start_signal_watcher(task) start_queue_listener(task) if @listen_queue scheduler_loop(task) semaphore.acquire {} @queue_store&.close @queue_waker&.close end end |
#running? ⇒ Boolean
62 63 64 |
# File 'lib/async/background/runner.rb', line 62 def running? @running end |
#stop ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/async/background/runner.rb', line 53 def stop return unless @running @running = false logger.info { "Async::Background: stopping gracefully" } shutdown.signal @queue_waker&.signal end |