Module: Legion::Extensions::Node::Helpers::Rabbitmq

Defined in:
lib/legion/extensions/node/helpers/rabbitmq.rb

Class Method Summary collapse

Class Method Details

.api_get(http, path, settings) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 82

def api_get(http, path, settings)
  req = Net::HTTP::Get.new(path)
  req.basic_auth(
    settings.dig(:connection, :user) || 'guest',
    settings.dig(:connection, :password) || 'guest'
  )
  response = http.request(req)
  return nil unless response.code.start_with?('2')

  ::JSON.parse(response.body)
rescue StandardError => _e
  nil
end

.build_http(settings) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 73

def build_http(settings)
  host = settings.dig(:connection, :host) || '127.0.0.1'
  port = settings[:management_port] || 15_672
  http = Net::HTTP.new(host, port)
  http.open_timeout = 3
  http.read_timeout = 5
  http
end

.cluster_health(settings: nil) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 14

def cluster_health(settings: nil)
  settings ||= resolve_settings
  http = build_http(settings)
  {
    node_count:     fetch_node_count(http, settings),
    quorum_leaders: fetch_quorum_leaders(http, settings),
    shovel_links:   fetch_shovel_links(http, settings)
  }
rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT, SocketError, Net::OpenTimeout, Net::ReadTimeout => e
  log.warn("RabbitMQ management API unreachable: #{e.message}")
  { node_count: unreachable, quorum_leaders: unreachable, shovel_links: unreachable }
end

.fetch_node_count(http, settings) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 27

def fetch_node_count(http, settings)
  body = api_get(http, '/api/nodes', settings)
  return unreachable unless body

  running = body.count { |n| n['running'] }
  total   = body.size
  status  = if running == total
              'ok'
            else
              (running.positive? ? 'warn' : 'critical')
            end
  { status: status, running: running, total: total }
end

.fetch_quorum_leaders(http, settings) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 41

def fetch_quorum_leaders(http, settings)
  vhost = settings.dig(:connection, :vhost) || '/'
  encoded_vhost = URI.encode_www_form_component(vhost)
  body = api_get(http, "/api/queues/#{encoded_vhost}", settings)
  return unreachable unless body

  quorum_queues = body.select { |q| q['type'] == 'quorum' }
  return { status: 'ok', quorum_queues: 0, leaders_on_this_node: 0 } if quorum_queues.empty?

  node_name = resolve_node_name(http, settings)
  leaders_here = quorum_queues.count { |q| q['leader'] == node_name }
  { status: 'ok', quorum_queues: quorum_queues.size, leaders_on_this_node: leaders_here }
end


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 55

def fetch_shovel_links(http, settings)
  vhost = settings.dig(:connection, :vhost) || '/'
  encoded_vhost = URI.encode_www_form_component(vhost)
  body = api_get(http, "/api/shovels/#{encoded_vhost}", settings)
  return unreachable unless body

  running = body.count { |s| s['state'] == 'running' }
  total   = body.size
  status  = if total.zero?
              'ok'
            else
              (running == total ? 'ok' : 'warn')
            end
  { status: status, total: total, running: running }
rescue StandardError => _e
  { status: 'ok', total: 0, running: 0 }
end

.log_warn(msg) ⇒ Object



117
118
119
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 117

def log_warn(msg)
  log.warn(msg)
end

.resolve_node_name(http, settings) ⇒ Object



96
97
98
99
100
101
102
103
104
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 96

def resolve_node_name(http, settings)
  body = api_get(http, '/api/whoami', settings)
  return nil unless body

  nodes = api_get(http, '/api/nodes', settings)
  return nil unless nodes

  nodes.first&.dig('name')
end

.resolve_settingsObject



106
107
108
109
110
111
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 106

def resolve_settings
  return Legion::Settings[:transport].to_h if defined?(Legion::Settings) && Legion::Settings.respond_to?(:[])

  { connection:      { host: '127.0.0.1', user: 'guest', password: 'guest', vhost: '/' },
    management_port: 15_672 }
end

.unreachableObject



113
114
115
# File 'lib/legion/extensions/node/helpers/rabbitmq.rb', line 113

def unreachable
  { status: 'unknown', detail: 'management api unreachable' }
end