Class: Lepus::Web::Aggregator
- Inherits:
-
Object
- Object
- Lepus::Web::Aggregator
- Defined in:
- lib/lepus/web/aggregator.rb
Overview
Aggregates process heartbeats from RabbitMQ into in-memory state. Subscribes to the lepus.heartbeat fanout exchange and maintains a thread-safe cache of all processes across connected Lepus apps.
Constant Summary collapse
- HEARTBEAT_EXCHANGE =
ProcessRegistry::RabbitmqBackend::HEARTBEAT_EXCHANGE
Instance Attribute Summary collapse
-
#stale_threshold ⇒ Object
readonly
Returns the value of attribute stale_threshold.
Instance Method Summary collapse
- #all_processes ⇒ Object
- #clear ⇒ Object
- #count ⇒ Object
- #find(id) ⇒ Object
-
#initialize(stale_threshold: nil) ⇒ Aggregator
constructor
A new instance of Aggregator.
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(stale_threshold: nil) ⇒ Aggregator
Returns a new instance of Aggregator.
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/lepus/web/aggregator.rb', line 16 def initialize(stale_threshold: nil) @stale_threshold = stale_threshold || Lepus.config.process_alive_threshold @processes = Concurrent::Map.new @connection = nil @channel = nil @consumer = nil @running = Concurrent::AtomicBoolean.new(false) @pruning_task = nil @mutex = Mutex.new end |
Instance Attribute Details
#stale_threshold ⇒ Object (readonly)
Returns the value of attribute stale_threshold.
14 15 16 |
# File 'lib/lepus/web/aggregator.rb', line 14 def stale_threshold @stale_threshold end |
Instance Method Details
#all_processes ⇒ Object
63 64 65 66 |
# File 'lib/lepus/web/aggregator.rb', line 63 def all_processes prune_stale_entries @processes.values.map { |data| data[:process] } end |
#clear ⇒ Object
76 77 78 |
# File 'lib/lepus/web/aggregator.rb', line 76 def clear @processes.clear end |
#count ⇒ Object
72 73 74 |
# File 'lib/lepus/web/aggregator.rb', line 72 def count @processes.size end |
#find(id) ⇒ Object
68 69 70 |
# File 'lib/lepus/web/aggregator.rb', line 68 def find(id) @processes[id]&.dig(:process) end |
#running? ⇒ Boolean
59 60 61 |
# File 'lib/lepus/web/aggregator.rb', line 59 def running? @running.true? end |
#start ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/lepus/web/aggregator.rb', line 27 def start return if @running.true? @mutex.synchronize do return if @running.true? @running.make_true setup_subscription start_pruning_task end rescue => e Lepus.logger.error("[Web::Aggregator] Failed to start: #{e.}") @running.make_false end |
#stop ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/lepus/web/aggregator.rb', line 42 def stop @mutex.synchronize do @running.make_false @pruning_task&.shutdown @consumer&.cancel @channel&.close if @channel&.open? @connection&.close if @connection&.open? end rescue => e Lepus.logger.warn("[Web::Aggregator] Error during shutdown: #{e.}") ensure @pruning_task = nil @consumer = nil @channel = nil @connection = nil end |