Module: Legion::Transport::Settings
- Defined in:
- lib/legion/transport/settings.rb
Constant Summary collapse
- DEFAULT_AMQP_PORT =
5672
Class Method Summary collapse
- .channel ⇒ Object
- .connection ⇒ Object
- .default ⇒ Object
- .exchanges ⇒ Object
- .grab_vault_creds ⇒ Object
- .messages ⇒ Object
- .queues ⇒ Object
- .resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) ⇒ Object
Class Method Details
.channel ⇒ Object
65 66 67 68 69 70 |
# File 'lib/legion/transport/settings.rb', line 65 def self.channel { default_worker_pool_size: ENV['transport.channel.default_worker_pool_size'] || 1, session_worker_pool_size: ENV['transport.channel.session_worker_pool_size'] || 8 } end |
.connection ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/transport/settings.rb', line 8 def self.connection host = ENV['transport.connection.host'] || '127.0.0.1' port = (ENV['transport.connection.port'] || DEFAULT_AMQP_PORT).to_i existing = defined?(Legion::Settings) ? (Legion::Settings[:transport][:connection] || {}) : {} extra_server = existing[:server] extra_servers = existing[:servers] || [] extra_hosts = existing[:hosts] || [] { read_timeout: 1, heartbeat: 30, automatically_recover: true, continuation_timeout: 4000, network_recovery_interval: 1, connection_timeout: 1, frame_max: 65_536, user: ENV['transport.connection.user'] || 'guest', password: ENV['transport.connection.password'] || 'guest', host: host, port: port, vhost: ENV['transport.connection.vhost'] || '/', recovery_attempts: 100, logger_level: ENV['transport.log_level'] || 'info', connected: false, resolved_hosts: resolve_hosts( host: host, hosts: Array(extra_hosts), server: extra_server, servers: Array(extra_servers), port: port ) }.merge(grab_vault_creds) end |
.default ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/legion/transport/settings.rb', line 102 def self.default { type: 'rabbitmq', connected: false, logger_level: ENV['transport.logger_level'] || 'info', messages: , prefetch: ENV['transport.prefetch'].to_i, exchanges: exchanges, queues: queues, connection: connection, channel: channel } end |
.exchanges ⇒ Object
83 84 85 86 87 88 89 90 91 |
# File 'lib/legion/transport/settings.rb', line 83 def self.exchanges { type: 'topic', arguments: {}, auto_delete: false, durable: true, internal: false } end |
.grab_vault_creds ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/legion/transport/settings.rb', line 53 def self.grab_vault_creds return {} unless Legion::Settings[:crypt][:vault][:connected] Legion::Transport.logger.info 'Attempting to grab RabbitMQ creds from vault' lease = Legion::Crypt.read('rabbitmq/creds/legion', type: nil) Legion::Transport.logger.debug 'successfully grabbed amqp username from Vault' { user: lease[:username], password: lease[:password] } rescue StandardError Legion::Transport.logger.warn 'Error reading rabbitmq creds from vault' {} end |
.messages ⇒ Object
93 94 95 96 97 98 99 100 |
# File 'lib/legion/transport/settings.rb', line 93 def self. { encrypt: ENV['transport.messages.encrypt'] == 'true', ttl: ENV.fetch('transport.messages.ttl', nil), priority: ENV['transport.messages.priority'].to_i, persistent: ENV['transport.messages.persistent'] == 'true' } end |
.queues ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/transport/settings.rb', line 72 def self.queues { manual_ack: true, durable: true, exclusive: false, block: false, auto_delete: false, arguments: { 'x-queue-type': 'quorum' } } end |
.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/legion/transport/settings.rb', line 43 def self.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) port ||= DEFAULT_AMQP_PORT all = Array(hosts) + Array(servers) + Array(host) + Array(server) all = ["127.0.0.1:#{port}"] if all.empty? all.map! { |s| s.to_s.include?(':') ? s.to_s : "#{s}:#{port}" } all.uniq end |