Module: Legion::Transport::Settings

Defined in:
lib/legion/transport/settings.rb

Constant Summary collapse

DEFAULT_AMQP_PORT =
5672

Class Method Summary collapse

Class Method Details

.channelObject



65
66
67
68
69
70
# File 'lib/legion/transport/settings.rb', line 65

def self.channel
  {
    default_worker_pool_size: ENV['transport.channel.default_worker_pool_size'] || 1,
    session_worker_pool_size: ENV['transport.channel.session_worker_pool_size'] || 8
  }
end

.connectionObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/transport/settings.rb', line 8

def self.connection
  host = ENV['transport.connection.host'] || '127.0.0.1'
  port = (ENV['transport.connection.port'] || DEFAULT_AMQP_PORT).to_i

  existing = defined?(Legion::Settings) ? (Legion::Settings[:transport][:connection] || {}) : {}
  extra_server  = existing[:server]
  extra_servers = existing[:servers] || []
  extra_hosts   = existing[:hosts] || []

  {
    read_timeout:              1,
    heartbeat:                 30,
    automatically_recover:     true,
    continuation_timeout:      4000,
    network_recovery_interval: 1,
    connection_timeout:        1,
    frame_max:                 65_536,
    user:                      ENV['transport.connection.user'] || 'guest',
    password:                  ENV['transport.connection.password'] || 'guest',
    host:                      host,
    port:                      port,
    vhost:                     ENV['transport.connection.vhost'] || '/',
    recovery_attempts:         100,
    logger_level:              ENV['transport.log_level'] || 'info',
    connected:                 false,
    resolved_hosts:            resolve_hosts(
      host: host, hosts: Array(extra_hosts),
      server: extra_server, servers: Array(extra_servers),
      port: port
    )
  }.merge(grab_vault_creds)
end

.defaultObject



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/legion/transport/settings.rb', line 102

def self.default
  {
    type:         'rabbitmq',
    connected:    false,
    logger_level: ENV['transport.logger_level'] || 'info',
    messages:     messages,
    prefetch:     ENV['transport.prefetch'].to_i,
    exchanges:    exchanges,
    queues:       queues,
    connection:   connection,
    channel:      channel
  }
end

.exchangesObject



83
84
85
86
87
88
89
90
91
# File 'lib/legion/transport/settings.rb', line 83

def self.exchanges
  {
    type:        'topic',
    arguments:   {},
    auto_delete: false,
    durable:     true,
    internal:    false
  }
end

.grab_vault_credsObject



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/legion/transport/settings.rb', line 53

def self.grab_vault_creds
  return {} unless Legion::Settings[:crypt][:vault][:connected]

  Legion::Transport.logger.info 'Attempting to grab RabbitMQ creds from vault'
  lease = Legion::Crypt.read('rabbitmq/creds/legion', type: nil)
  Legion::Transport.logger.debug 'successfully grabbed amqp username from Vault'
  { user: lease[:username], password: lease[:password] }
rescue StandardError
  Legion::Transport.logger.warn 'Error reading rabbitmq creds from vault'
  {}
end

.messagesObject



93
94
95
96
97
98
99
100
# File 'lib/legion/transport/settings.rb', line 93

def self.messages
  {
    encrypt:    ENV['transport.messages.encrypt'] == 'true',
    ttl:        ENV.fetch('transport.messages.ttl', nil),
    priority:   ENV['transport.messages.priority'].to_i,
    persistent: ENV['transport.messages.persistent'] == 'true'
  }
end

.queuesObject



72
73
74
75
76
77
78
79
80
81
# File 'lib/legion/transport/settings.rb', line 72

def self.queues
  {
    manual_ack:  true,
    durable:     true,
    exclusive:   false,
    block:       false,
    auto_delete: false,
    arguments:   { 'x-queue-type': 'quorum' }
  }
end

.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/legion/transport/settings.rb', line 43

def self.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil)
  port ||= DEFAULT_AMQP_PORT

  all = Array(hosts) + Array(servers) + Array(host) + Array(server)
  all = ["127.0.0.1:#{port}"] if all.empty?

  all.map! { |s| s.to_s.include?(':') ? s.to_s : "#{s}:#{port}" }
  all.uniq
end