Class: Async::Background::Runner

Inherits:
Object
  • Object
show all
Includes:
Clock, QueueExecution, Schedule
Defined in:
lib/async/background/runner.rb,
lib/async/background/runner/schedule.rb,
lib/async/background/runner/queue_execution.rb

Defined Under Namespace

Modules: QueueExecution, Schedule

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true, metrics_shm_path: Metrics.default_shm_path) ⇒ Runner

Returns a new instance of Runner.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/async/background/runner.rb', line 32

def initialize(
  config_path:,
  job_count: 2,
  worker_index:,
  total_workers:,
  queue_socket_dir: nil,
  queue_db_path: nil,
  queue_mmap: true,
  metrics_shm_path: Metrics.default_shm_path
)
  @logger = Console.logger
  @worker_index = worker_index
  @total_workers = total_workers
  @running = true
  @shutdown = ::Async::Condition.new
  @metrics = Metrics.new(
    worker_index: worker_index,
    total_workers: total_workers,
    shm_path: metrics_shm_path
  )
  logger.info { "Async::Background worker_index=#{worker_index}/#{total_workers}, job_count=#{job_count}" }

  @drain_barrier = ::Async::Barrier.new
  @semaphore = ::Async::Semaphore.new(job_count, parent: @drain_barrier)
  @heap = build_heap(config_path)
  setup_queue(queue_socket_dir, queue_db_path, queue_mmap)
end

Instance Attribute Details

#heapObject (readonly)

Returns the value of attribute heap.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def heap
  @heap
end

#loggerObject (readonly)

Returns the value of attribute logger.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def logger
  @logger
end

#metricsObject (readonly)

Returns the value of attribute metrics.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def metrics
  @metrics
end

#queue_storeObject (readonly)

Returns the value of attribute queue_store.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def queue_store
  @queue_store
end

#semaphoreObject (readonly)

Returns the value of attribute semaphore.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def semaphore
  @semaphore
end

#shutdownObject (readonly)

Returns the value of attribute shutdown.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def shutdown
  @shutdown
end

#total_workersObject (readonly)

Returns the value of attribute total_workers.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def total_workers
  @total_workers
end

#worker_indexObject (readonly)

Returns the value of attribute worker_index.



23
24
25
# File 'lib/async/background/runner.rb', line 23

def worker_index
  @worker_index
end

Instance Method Details

#runObject



60
61
62
63
64
65
66
67
68
69
# File 'lib/async/background/runner.rb', line 60

def run
  Async do |task|
    setup_signal_handlers
    start_signal_watcher(task)
    start_queue_listener(task) if @listen_queue

    scheduler_loop(task)
    drain_and_close_queue
  end
end

#running?Boolean

Returns:

  • (Boolean)


80
# File 'lib/async/background/runner.rb', line 80

def running? = @running

#stopObject



71
72
73
74
75
76
77
78
# File 'lib/async/background/runner.rb', line 71

def stop
  return unless @running

  @running = false
  logger.info { 'Async::Background: stopping gracefully' }
  shutdown.signal
  @queue_waker&.signal
end