Class: Async::Background::Runner

Inherits:
Object
  • Object
show all
Includes:
Clock, QueueExecution, Schedule
Defined in:
lib/async/background/runner.rb,
lib/async/background/runner/schedule.rb,
lib/async/background/runner/queue_execution.rb

Defined Under Namespace

Modules: QueueExecution, Schedule

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_path: nil, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path) ⇒ Runner

config_path: nil explicitly disables recurring jobs. This keeps the dynamic SQLite queue usable on its own; a supplied path remains strict so a typo cannot silently disable scheduled work.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/async/background/runner.rb', line 35

def initialize(
  config_path: nil,
  job_count: 2,
  worker_index:,
  total_workers:,
  queue_socket_dir: nil,
  queue_db_path: nil,
  queue_mmap: true,
  metrics_shm_path: Metrics.default_shm_path
)
  @logger = Console.logger
  @worker_index = worker_index
  @total_workers = total_workers
  @running = true
  @shutdown = ::Async::Condition.new
  @metrics = Metrics.new(
    worker_index: worker_index,
    total_workers: total_workers,
    shm_path: metrics_shm_path
  )
  logger.info { "Async::Background worker_index=#{worker_index}/#{total_workers}, job_count=#{job_count}" }

  @drain_barrier = ::Async::Barrier.new
  @semaphore = ::Async::Semaphore.new(job_count, parent: @drain_barrier)
  @heap = config_path.nil? ? MinHeap.new : build_heap(config_path)
  setup_queue(queue_socket_dir, queue_db_path, queue_mmap)
  validate_work_source!(config_path)
end

Instance Attribute Details

#heapObject (readonly)

Returns the value of attribute heap.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def heap
  @heap
end

#loggerObject (readonly)

Returns the value of attribute logger.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def logger
  @logger
end

#metricsObject (readonly)

Returns the value of attribute metrics.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def metrics
  @metrics
end

#queue_storeObject (readonly)

Returns the value of attribute queue_store.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def queue_store
  @queue_store
end

#semaphoreObject (readonly)

Returns the value of attribute semaphore.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def semaphore
  @semaphore
end

#shutdownObject (readonly)

Returns the value of attribute shutdown.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def shutdown
  @shutdown
end

#total_workersObject (readonly)

Returns the value of attribute total_workers.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def total_workers
  @total_workers
end

#worker_indexObject (readonly)

Returns the value of attribute worker_index.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def worker_index
  @worker_index
end

Instance Method Details

#runObject



64
65
66
67
68
69
70
71
72
73
# File 'lib/async/background/runner.rb', line 64

def run
  Async do |task|
    setup_signal_handlers
    start_signal_watcher(task)
    start_queue_listener(task) if @listen_queue

    scheduler_loop(task)
    drain_and_close_queue
  end
end

#running?Boolean

Returns:

  • (Boolean)


84
# File 'lib/async/background/runner.rb', line 84

def running? = @running

#stopObject



75
76
77
78
79
80
81
82
# File 'lib/async/background/runner.rb', line 75

def stop
  return unless @running

  @running = false
  logger.info { 'Async::Background: stopping gracefully' }
  shutdown.signal
  @queue_waker&.signal
end