Class: Async::Background::Runner
- Inherits:
-
Object
- Object
- Async::Background::Runner
- Includes:
- Clock, QueueExecution, Schedule
- Defined in:
- lib/async/background/runner.rb,
lib/async/background/runner/schedule.rb,
lib/async/background/runner/queue_execution.rb
Defined Under Namespace
Modules: QueueExecution, Schedule
Instance Attribute Summary collapse
-
#heap ⇒ Object
readonly
Returns the value of attribute heap.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#metrics ⇒ Object
readonly
Returns the value of attribute metrics.
-
#queue_store ⇒ Object
readonly
Returns the value of attribute queue_store.
-
#semaphore ⇒ Object
readonly
Returns the value of attribute semaphore.
-
#shutdown ⇒ Object
readonly
Returns the value of attribute shutdown.
-
#total_workers ⇒ Object
readonly
Returns the value of attribute total_workers.
-
#worker_index ⇒ Object
readonly
Returns the value of attribute worker_index.
Instance Method Summary collapse
-
#initialize(config_path: nil, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path) ⇒ Runner
constructor
config_path: nilexplicitly disables recurring jobs. - #run ⇒ Object
- #running? ⇒ Boolean
- #stop ⇒ Object
Constructor Details
#initialize(config_path: nil, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path) ⇒ Runner
config_path: nil explicitly disables recurring jobs. This keeps the
dynamic SQLite queue usable on its own; a supplied path remains strict
so a typo cannot silently disable scheduled work.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/async/background/runner.rb', line 35 def initialize( config_path: nil, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path ) @logger = Console.logger @worker_index = worker_index @total_workers = total_workers @running = true @shutdown = ::Async::Condition.new @metrics = Metrics.new( worker_index: worker_index, total_workers: total_workers, shm_path: metrics_shm_path ) logger.info { "Async::Background worker_index=#{worker_index}/#{total_workers}, job_count=#{job_count}" } @drain_barrier = ::Async::Barrier.new @semaphore = ::Async::Semaphore.new(job_count, parent: @drain_barrier) @heap = config_path.nil? ? MinHeap.new : build_heap(config_path) setup_queue(queue_socket_dir, queue_db_path, queue_mmap) validate_work_source!(config_path) end |
Instance Attribute Details
#heap ⇒ Object (readonly)
Returns the value of attribute heap.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def heap @heap end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def logger @logger end |
#metrics ⇒ Object (readonly)
Returns the value of attribute metrics.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def metrics @metrics end |
#queue_store ⇒ Object (readonly)
Returns the value of attribute queue_store.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def queue_store @queue_store end |
#semaphore ⇒ Object (readonly)
Returns the value of attribute semaphore.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def semaphore @semaphore end |
#shutdown ⇒ Object (readonly)
Returns the value of attribute shutdown.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def shutdown @shutdown end |
#total_workers ⇒ Object (readonly)
Returns the value of attribute total_workers.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def total_workers @total_workers end |
#worker_index ⇒ Object (readonly)
Returns the value of attribute worker_index.
23 24 25 |
# File 'lib/async/background/runner.rb', line 23 def worker_index @worker_index end |
Instance Method Details
#run ⇒ Object
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/async/background/runner.rb', line 64 def run Async do |task| setup_signal_handlers start_signal_watcher(task) start_queue_listener(task) if @listen_queue scheduler_loop(task) drain_and_close_queue end end |
#running? ⇒ Boolean
84 |
# File 'lib/async/background/runner.rb', line 84 def running? = @running |
#stop ⇒ Object
75 76 77 78 79 80 81 82 |
# File 'lib/async/background/runner.rb', line 75 def stop return unless @running @running = false logger.info { 'Async::Background: stopping gracefully' } shutdown.signal @queue_waker&.signal end |