Class: Async::Background::Runner
- Inherits:
-
Object
- Object
- Async::Background::Runner
- Includes:
- Clock, QueueExecution, Schedule
- Defined in:
- lib/async/background/runner.rb,
lib/async/background/runner/schedule.rb,
lib/async/background/runner/queue_execution.rb
Defined Under Namespace
Modules: QueueExecution, Schedule
Instance Attribute Summary collapse
-
#heap ⇒ Object
readonly
Returns the value of attribute heap.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#metrics ⇒ Object
readonly
Returns the value of attribute metrics.
-
#queue_store ⇒ Object
readonly
Returns the value of attribute queue_store.
-
#semaphore ⇒ Object
readonly
Returns the value of attribute semaphore.
-
#shutdown ⇒ Object
readonly
Returns the value of attribute shutdown.
-
#total_workers ⇒ Object
readonly
Returns the value of attribute total_workers.
-
#worker_index ⇒ Object
readonly
Returns the value of attribute worker_index.
Instance Method Summary collapse
-
#initialize(config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path) ⇒ Runner
constructor
A new instance of Runner.
- #run ⇒ Object
- #running? ⇒ Boolean
- #stop ⇒ Object
Constructor Details
#initialize(config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path) ⇒ Runner
Returns a new instance of Runner.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/async/background/runner.rb', line 32 def initialize( config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path ) @logger = Console.logger @worker_index = worker_index @total_workers = total_workers @running = true @shutdown = ::Async::Condition.new @metrics = Metrics.new( worker_index: worker_index, total_workers: total_workers, shm_path: metrics_shm_path ) logger.info { "Async::Background worker_index=#{worker_index}/#{total_workers}, job_count=#{job_count}" } @drain_barrier = ::Async::Barrier.new @semaphore = ::Async::Semaphore.new(job_count, parent: @drain_barrier) @heap = build_heap(config_path) setup_queue(queue_socket_dir, queue_db_path, queue_mmap) end |
Instance Attribute Details
#heap ⇒ Object (readonly)
Returns the value of attribute heap.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def heap @heap end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def logger @logger end |
#metrics ⇒ Object (readonly)
Returns the value of attribute metrics.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def metrics @metrics end |
#queue_store ⇒ Object (readonly)
Returns the value of attribute queue_store.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def queue_store @queue_store end |
#semaphore ⇒ Object (readonly)
Returns the value of attribute semaphore.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def semaphore @semaphore end |
#shutdown ⇒ Object (readonly)
Returns the value of attribute shutdown.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def shutdown @shutdown end |
#total_workers ⇒ Object (readonly)
Returns the value of attribute total_workers.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def total_workers @total_workers end |
#worker_index ⇒ Object (readonly)
Returns the value of attribute worker_index.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def worker_index @worker_index end |
Instance Method Details
#run ⇒ Object
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/async/background/runner.rb', line 60 def run Async do |task| setup_signal_handlers start_signal_watcher(task) start_queue_listener(task) if @listen_queue scheduler_loop(task) drain_and_close_queue end end |
#running? ⇒ Boolean
80 |
# File 'lib/async/background/runner.rb', line 80 def running? = @running |
#stop ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/async/background/runner.rb', line 71 def stop return unless @running @running = false logger.info { 'Async::Background: stopping gracefully' } shutdown.signal @queue_waker&.signal end |