Module: Legion::Transport::Settings
- Defined in:
- lib/legion/transport/settings.rb
Constant Summary collapse
- DEFAULT_AMQP_PORT =
5672
Class Method Summary collapse
- .channel ⇒ Object
- .connection ⇒ Object
- .default ⇒ Object
- .exchanges ⇒ Object
- .grab_vault_creds ⇒ Object
- .messages ⇒ Object
- .queues ⇒ Object
- .resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) ⇒ Object
- .tenant_topology ⇒ Object
Class Method Details
.channel ⇒ Object
65 66 67 68 69 70 |
# File 'lib/legion/transport/settings.rb', line 65 def self.channel { default_worker_pool_size: ENV['transport.channel.default_worker_pool_size'] || 1, session_worker_pool_size: ENV['transport.channel.session_worker_pool_size'] || 8 } end |
.connection ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/transport/settings.rb', line 8 def self.connection host = ENV['transport.connection.host'] || '127.0.0.1' port = (ENV['transport.connection.port'] || DEFAULT_AMQP_PORT).to_i existing = defined?(Legion::Settings) ? (Legion::Settings[:transport][:connection] || {}) : {} extra_server = existing[:server] extra_servers = existing[:servers] || [] extra_hosts = existing[:hosts] || [] { read_timeout: 1, heartbeat: (ENV['transport.connection.heartbeat'] || 30).to_i, automatically_recover: true, continuation_timeout: 4000, network_recovery_interval: (ENV['transport.connection.recovery_interval'] || 2).to_i, connection_timeout: (ENV['transport.connection.connection_timeout'] || 10).to_i, frame_max: 65_536, user: ENV['transport.connection.user'] || 'guest', password: ENV['transport.connection.password'] || 'guest', host: host, port: port, vhost: ENV['transport.connection.vhost'] || '/', recovery_attempts: 100, logger_level: ENV['transport.log_level'] || 'info', connected: false, resolved_hosts: resolve_hosts( host: host, hosts: Array(extra_hosts), server: extra_server, servers: Array(extra_servers), port: port ) }.merge(grab_vault_creds) end |
.default ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/legion/transport/settings.rb', line 112 def self.default cluster_csv = ENV.fetch('transport.cluster_nodes', '') { type: 'rabbitmq', connected: false, logger_level: ENV['transport.logger_level'] || 'info', messages: , prefetch: ENV['transport.prefetch'].to_i, exchanges: exchanges, queues: queues, connection: connection, channel: channel, tenant_topology: tenant_topology, cluster_nodes: cluster_csv.empty? ? [] : cluster_csv.split(',').map(&:strip), connection_pool_size: (ENV['transport.connection_pool_size'] || 1).to_i, region: ENV.fetch('transport.region', nil), management_port: (ENV['transport.management_port'] || 15_672).to_i, quorum_queue_policy: { enabled: ENV['transport.quorum_queue_policy.enabled'] == 'true', pattern: ENV['transport.quorum_queue_policy.pattern'] || '^legion\\.', delivery_limit: (ENV['transport.quorum_queue_policy.delivery_limit'] || 5).to_i } } end |
.exchanges ⇒ Object
83 84 85 86 87 88 89 90 91 |
# File 'lib/legion/transport/settings.rb', line 83 def self.exchanges { type: 'topic', arguments: {}, auto_delete: false, durable: true, internal: false } end |
.grab_vault_creds ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/legion/transport/settings.rb', line 53 def self.grab_vault_creds return {} unless Legion::Settings[:crypt][:vault][:connected] Legion::Transport.logger.info 'Attempting to grab RabbitMQ creds from vault' lease = Legion::Crypt.read('rabbitmq/creds/legion', type: nil) Legion::Transport.logger.debug 'successfully grabbed amqp username from Vault' { user: lease[:username], password: lease[:password] } rescue StandardError Legion::Transport.logger.warn 'Error reading rabbitmq creds from vault' {} end |
.messages ⇒ Object
93 94 95 96 97 98 99 100 |
# File 'lib/legion/transport/settings.rb', line 93 def self. { encrypt: ENV['transport.messages.encrypt'] == 'true', ttl: ENV.fetch('transport.messages.ttl', nil), priority: ENV['transport.messages.priority'].to_i, persistent: ENV['transport.messages.persistent'] == 'true' } end |
.queues ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/transport/settings.rb', line 72 def self.queues { manual_ack: true, durable: true, exclusive: false, block: false, auto_delete: false, arguments: { 'x-queue-type': 'quorum' } } end |
.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/legion/transport/settings.rb', line 43 def self.resolve_hosts(host: nil, hosts: [], server: nil, servers: [], port: nil) port ||= DEFAULT_AMQP_PORT all = Array(hosts) + Array(servers) + Array(host) + Array(server) all = ["127.0.0.1:#{port}"] if all.empty? all.map! { |s| s.to_s.include?(':') ? s.to_s : "#{s}:#{port}" } all.uniq end |
.tenant_topology ⇒ Object
102 103 104 105 106 107 108 109 110 |
# File 'lib/legion/transport/settings.rb', line 102 def self.tenant_topology { enabled: false, prefix_format: 't.%<tenant_id>s.', shared_exchanges: %w[legion.control legion.health legion.audit], auto_provision: true, quotas: {} } end |