Class: Lepus::Web::Aggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/lepus/web/aggregator.rb

Overview

Aggregates process heartbeats from RabbitMQ into in-memory state. Subscribes to the lepus.heartbeat fanout exchange and maintains a thread-safe cache of all processes across connected Lepus apps.

Constant Summary collapse

HEARTBEAT_EXCHANGE =
ProcessRegistry::RabbitmqBackend::HEARTBEAT_EXCHANGE

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stale_threshold: nil) ⇒ Aggregator

Returns a new instance of Aggregator.



16
17
18
19
20
21
22
23
24
25
# File 'lib/lepus/web/aggregator.rb', line 16

def initialize(stale_threshold: nil)
  @stale_threshold = stale_threshold || Lepus.config.process_alive_threshold
  @processes = Concurrent::Map.new
  @connection = nil
  @channel = nil
  @consumer = nil
  @running = Concurrent::AtomicBoolean.new(false)
  @pruning_task = nil
  @mutex = Mutex.new
end

Instance Attribute Details

#stale_thresholdObject (readonly)

Returns the value of attribute stale_threshold.



14
15
16
# File 'lib/lepus/web/aggregator.rb', line 14

def stale_threshold
  @stale_threshold
end

Instance Method Details

#all_processesObject



63
64
65
66
# File 'lib/lepus/web/aggregator.rb', line 63

def all_processes
  prune_stale_entries
  @processes.values.map { |data| data[:process] }
end

#clearObject



76
77
78
# File 'lib/lepus/web/aggregator.rb', line 76

def clear
  @processes.clear
end

#countObject



72
73
74
# File 'lib/lepus/web/aggregator.rb', line 72

def count
  @processes.size
end

#find(id) ⇒ Object



68
69
70
# File 'lib/lepus/web/aggregator.rb', line 68

def find(id)
  @processes[id]&.dig(:process)
end

#running?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/lepus/web/aggregator.rb', line 59

def running?
  @running.true?
end

#startObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/lepus/web/aggregator.rb', line 27

def start
  return if @running.true?

  @mutex.synchronize do
    return if @running.true?

    @running.make_true
    setup_subscription
    start_pruning_task
  end
rescue => e
  Lepus.logger.error("[Web::Aggregator] Failed to start: #{e.message}")
  @running.make_false
end

#stopObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/lepus/web/aggregator.rb', line 42

def stop
  @mutex.synchronize do
    @running.make_false
    @pruning_task&.shutdown
    @consumer&.cancel
    @channel&.close if @channel&.open?
    @connection&.close if @connection&.open?
  end
rescue => e
  Lepus.logger.warn("[Web::Aggregator] Error during shutdown: #{e.message}")
ensure
  @pruning_task = nil
  @consumer = nil
  @channel = nil
  @connection = nil
end