Module: Legion::Transport::Connection
- Extended by:
- SSL
- Defined in:
- lib/legion/transport/connection.rb,
lib/legion/transport/connection/ssl.rb,
lib/legion/transport/connection/vault.rb
Defined Under Namespace
Class Method Summary collapse
- .channel ⇒ Object
- .channel_open? ⇒ Boolean
- .channel_thread ⇒ Object
- .connector ⇒ Object
- .new ⇒ Object
- .reconnect(connection_name: 'Legion') ⇒ Object
- .session ⇒ Object
- .session_open? ⇒ Boolean
- .settings ⇒ Object
- .setup(connection_name: 'Legion') ⇒ Object
- .shutdown ⇒ Object
Methods included from SSL
Class Method Details
.channel ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/legion/transport/connection.rb', line 60 def channel return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open? @channel_thread.value = session.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10) @channel_thread.value.prefetch(settings[:prefetch]) Legion::Logging.debug "Channel created for thread #{Thread.current.object_id}" if defined?(Legion::Logging) @channel_thread.value end |
.channel_open? ⇒ Boolean
78 79 80 81 82 83 |
# File 'lib/legion/transport/connection.rb', line 78 def channel_open? channel.open? rescue StandardError => e Legion::Logging.debug("Connection#channel_open? failed: #{e.}") if defined?(Legion::Logging) false end |
.channel_thread ⇒ Object
74 75 76 |
# File 'lib/legion/transport/connection.rb', line 74 def channel_thread channel end |
.connector ⇒ Object
20 21 22 |
# File 'lib/legion/transport/connection.rb', line 20 def connector Legion::Transport::CONNECTOR end |
.new ⇒ Object
16 17 18 |
# File 'lib/legion/transport/connection.rb', line 16 def new clone end |
.reconnect(connection_name: 'Legion') ⇒ Object
24 25 26 27 28 |
# File 'lib/legion/transport/connection.rb', line 24 def reconnect(connection_name: 'Legion', **) @session = nil @channel_thread = Concurrent::ThreadLocalVar.new(nil) setup(connection_name: connection_name) end |
.session ⇒ Object
69 70 71 72 |
# File 'lib/legion/transport/connection.rb', line 69 def session nil if @session.nil? @session.value end |
.session_open? ⇒ Boolean
85 86 87 88 89 90 |
# File 'lib/legion/transport/connection.rb', line 85 def session_open? session.open? rescue StandardError => e Legion::Logging.debug("Connection#session_open? failed: #{e.}") if defined?(Legion::Logging) false end |
.settings ⇒ Object
12 13 14 |
# File 'lib/legion/transport/connection.rb', line 12 def settings Legion::Settings[:transport] end |
.setup(connection_name: 'Legion') ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/legion/transport/connection.rb', line 30 def setup(connection_name: 'Legion', **) Legion::Transport.logger.info("Using transport connector: #{Legion::Transport::CONNECTOR}") if @session.respond_to?(:value) && session.respond_to?(:closed?) && session.closed? @channel_thread = Concurrent::ThreadLocalVar.new(nil) elsif @session.respond_to?(:value) && session.respond_to?(:closed?) && session.open? nil else @session ||= Concurrent::AtomicReference.new( create_session_with_failover(connection_name: connection_name) ) @channel_thread = Concurrent::ThreadLocalVar.new(nil) session.start session.create_channel(nil, settings[:channel][:session_worker_pool_size]) .basic_qos(settings[:prefetch], true) Legion::Settings[:transport][:connected] = true if defined?(Legion::Logging) host = settings.dig(:connection, :host) || '127.0.0.1' port = settings.dig(:connection, :port) || 5672 user = settings.dig(:connection, :user) || 'guest' vhost = settings.dig(:connection, :vhost) || '/' Legion::Logging.info "Connected to amqp://#{user}@#{host}:#{port}/#{vhost}" end end register_session_callbacks apply_quorum_policy_if_enabled true end |