15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/legion/transport/helpers/policy.rb', line 15
def apply_quorum_policy!(settings: nil)
settings ||= Legion::Settings[:transport]
policy = settings[:quorum_queue_policy]
return false unless policy && policy[:enabled]
conn = settings[:connection]
host = conn[:host] || '127.0.0.1'
port = settings[:management_port] || 15_672
user = conn[:user] || 'guest'
pass = conn[:password] || 'guest'
vhost = conn[:vhost] || '/'
encoded_vhost = URI.encode_www_form_component(vhost)
uri = URI("http://#{host}:#{port}/api/policies/#{encoded_vhost}/legion-quorum")
body = {
pattern: policy[:pattern] || '^legion\\.',
definition: {
'x-queue-type': 'quorum',
'x-delivery-limit': policy[:delivery_limit] || 5
},
'apply-to': 'queues',
priority: 0
}
req = Net::HTTP::Put.new(uri)
req.basic_auth(user, pass)
req.content_type = 'application/json'
req.body = ::JSON.dump(body)
http = Net::HTTP.new(uri.host, uri.port)
http.open_timeout = 5
http.read_timeout = 5
response = http.request(req)
applied = response.code.start_with?('2')
log.info("Quorum policy applied pattern=#{policy[:pattern] || '^legion\\.'} host=#{host}:#{port}") if applied
applied
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.policy.apply_quorum',
host: host, port: port, vhost: vhost)
false
end
|