Class: Async::Background::Runner

Inherits:
Object
  • Object
show all
Includes:
Clock
Defined in:
lib/async/background/runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_path:, job_count: 2, worker_index:, total_workers:, queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true) ⇒ Runner

Returns a new instance of Runner.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/async/background/runner.rb', line 20

def initialize(
  config_path:, job_count: 2, worker_index:, total_workers:,
  queue_socket_dir: nil, queue_db_path: nil, queue_mmap: true
)
  @logger        = Console.logger
  @worker_index  = worker_index
  @total_workers = total_workers
  @running       = true
  @shutdown      = ::Async::Condition.new
  @metrics       = Metrics.new(worker_index: worker_index, total_workers: total_workers)

  logger.info { "Async::Background worker_index=#{worker_index}/#{total_workers}, job_count=#{job_count}" }

  @semaphore = ::Async::Semaphore.new(job_count)
  @heap      = build_heap(config_path)

  setup_queue(queue_socket_dir, queue_db_path, queue_mmap)
end

Instance Attribute Details

#heapObject (readonly)

Returns the value of attribute heap.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def heap
  @heap
end

#loggerObject (readonly)

Returns the value of attribute logger.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def logger
  @logger
end

#metricsObject (readonly)

Returns the value of attribute metrics.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def metrics
  @metrics
end

#queue_storeObject (readonly)

Returns the value of attribute queue_store.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def queue_store
  @queue_store
end

#semaphoreObject (readonly)

Returns the value of attribute semaphore.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def semaphore
  @semaphore
end

#shutdownObject (readonly)

Returns the value of attribute shutdown.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def shutdown
  @shutdown
end

#total_workersObject (readonly)

Returns the value of attribute total_workers.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def total_workers
  @total_workers
end

#worker_indexObject (readonly)

Returns the value of attribute worker_index.



18
19
20
# File 'lib/async/background/runner.rb', line 18

def worker_index
  @worker_index
end

Instance Method Details

#runObject



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/async/background/runner.rb', line 39

def run
  Async do |task|
    setup_signal_handlers
    start_signal_watcher(task)
    start_queue_listener(task) if @listen_queue

    scheduler_loop(task)

    semaphore.acquire {}
    @queue_store&.close
    @queue_waker&.close
  end
end

#running?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/async/background/runner.rb', line 62

def running?
  @running
end

#stopObject



53
54
55
56
57
58
59
60
# File 'lib/async/background/runner.rb', line 53

def stop
  return unless @running

  @running = false
  logger.info { "Async::Background: stopping gracefully" }
  shutdown.signal
  @queue_waker&.signal
end