Module: Legion::Transport::Settings

Defined in:
lib/legion/transport/settings.rb

Constant Summary collapse

DEFAULT_AMQP_PORT =
5672

Class Method Summary collapse

Class Method Details

.channelObject



65
66
67
68
69
70
# File 'lib/legion/transport/settings.rb', line 65

def self.channel
  {
    default_worker_pool_size: ENV['transport.channel.default_worker_pool_size'] || 1,
    session_worker_pool_size: ENV['transport.channel.session_worker_pool_size'] || 8
  }
end

.connectionObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/transport/settings.rb', line 8

def self.connection
  host = ENV['transport.connection.host'] || '127.0.0.1'
  port = (ENV['transport.connection.port'] || DEFAULT_AMQP_PORT).to_i

  existing = defined?(Legion::Settings) ? (Legion::Settings[:transport][:connection] || {}) : {}
  extra_server  = existing[:server]
  extra_servers = existing[:servers] || []
  extra_hosts   = existing[:hosts] || []

  {
    read_timeout:              1,
    heartbeat:                 (ENV['transport.connection.heartbeat'] || 30).to_i,
    automatically_recover:     true,
    continuation_timeout:      4000,
    network_recovery_interval: (ENV['transport.connection.recovery_interval'] || 2).to_i,
    connection_timeout:        (ENV['transport.connection.connection_timeout'] || 10).to_i,
    frame_max:                 65_536,
    user:                      ENV['transport.connection.user'] || 'guest',
    password:                  ENV['transport.connection.password'] || 'guest',
    host:                      host,
    port:                      port,
    vhost:                     ENV['transport.connection.vhost'] || '/',
    recovery_attempts:         100,
    logger_level:              ENV['transport.log_level'] || 'info',
    connected:                 false,
    resolved_hosts:            resolve_hosts(
      host: host, hosts: Array(extra_hosts),
      server: extra_server, servers: Array(extra_servers),
      port: port
    )
  }.merge(grab_vault_creds)
end

.defaultObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/legion/transport/settings.rb', line 112

def self.default
  cluster_csv = ENV.fetch('transport.cluster_nodes', '')
  {
    type:                 'rabbitmq',
    connected:            false,
    logger_level:         ENV['transport.logger_level'] || 'info',
    messages:             messages,
    prefetch:             ENV['transport.prefetch'].to_i,
    exchanges:            exchanges,
    queues:               queues,
    connection:           connection,
    channel:              channel,
    tenant_topology:      tenant_topology,
    cluster_nodes:        cluster_csv.empty? ? [] : cluster_csv.split(',').map(&:strip),
    connection_pool_size: (ENV['transport.connection_pool_size'] || 1).to_i,
    region:               ENV.fetch('transport.region', nil),
    management_port:      (ENV['transport.management_port'] || 15_672).to_i,
    quorum_queue_policy:  {
      enabled:        ENV['transport.quorum_queue_policy.enabled'] == 'true',
      pattern:        ENV['transport.quorum_queue_policy.pattern'] || '^legion\\.',
      delivery_limit: (ENV['transport.quorum_queue_policy.delivery_limit'] || 5).to_i
    }
  }
end

.exchangesObject



83
84
85
86
87
88
89
90
91
# File 'lib/legion/transport/settings.rb', line 83

def self.exchanges
  {
    type:        'topic',
    arguments:   {},
    auto_delete: false,
    durable:     true,
    internal:    false
  }
end

.grab_vault_credsObject



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/legion/transport/settings.rb', line 53

def self.grab_vault_creds
  return {} unless Legion::Settings[:crypt][:vault][:connected]

  Legion::Transport.logger.info 'Attempting to grab RabbitMQ creds from vault'
  lease = Legion::Crypt.read('rabbitmq/creds/legion', type: nil)
  Legion::Transport.logger.debug 'successfully grabbed amqp username from Vault'
  { user: lease[:username], password: lease[:password] }
rescue StandardError
  Legion::Transport.logger.warn 'Error reading rabbitmq creds from vault'
  {}
end

.messagesObject



93
94
95
96
97
98
99
100
# File 'lib/legion/transport/settings.rb', line 93

def self.messages
  {
    encrypt:    ENV['transport.messages.encrypt'] == 'true',
    ttl:        ENV.fetch('transport.messages.ttl', nil),
    priority:   ENV['transport.messages.priority'].to_i,
    persistent: ENV['transport.messages.persistent'] == 'true'
  }
end

.queuesObject



72
73
74
75
76
77
78
79
80
81
# File 'lib/legion/transport/settings.rb', line 72

def self.queues
  {
    manual_ack:  true,
    durable:     true,
    exclusive:   false,
    block:       false,
    auto_delete: false,
    arguments:   { 'x-queue-type': 'quorum' }
  }
end

.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/legion/transport/settings.rb', line 43

def self.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil)
  port ||= DEFAULT_AMQP_PORT

  all = Array(hosts) + Array(servers) + Array(host) + Array(server)
  all = ["127.0.0.1:#{port}"] if all.empty?

  all.map! { |s| s.to_s.include?(':') ? s.to_s : "#{s}:#{port}" }
  all.uniq
end

.tenant_topologyObject



102
103
104
105
106
107
108
109
110
# File 'lib/legion/transport/settings.rb', line 102

def self.tenant_topology
  {
    enabled:          false,
    prefix_format:    't.%<tenant_id>s.',
    shared_exchanges: %w[legion.control legion.health legion.audit],
    auto_provision:   true,
    quotas:           {}
  }
end