Module: Legion::Transport::Connection

Extended by:
SSL
Defined in:
lib/legion/transport/connection.rb,
lib/legion/transport/connection/ssl.rb,
lib/legion/transport/connection/vault.rb

Defined Under Namespace

Modules: SSL, Vault

Class Method Summary collapse

Methods included from SSL

tls_options

Class Method Details

.channelObject



56
57
58
59
60
61
62
63
# File 'lib/legion/transport/connection.rb', line 56

def channel
  return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open?

  @channel_thread.value = session.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10)
  @channel_thread.value.prefetch(settings[:prefetch])
  Legion::Logging.debug "Channel created for thread #{Thread.current.object_id}" if defined?(Legion::Logging)
  @channel_thread.value
end

.channel_open?Boolean

Returns:

  • (Boolean)


74
75
76
77
78
# File 'lib/legion/transport/connection.rb', line 74

def channel_open?
  channel.open?
rescue StandardError
  false
end

.channel_threadObject



70
71
72
# File 'lib/legion/transport/connection.rb', line 70

def channel_thread
  channel
end

.connectorObject



20
21
22
# File 'lib/legion/transport/connection.rb', line 20

def connector
  Legion::Transport::CONNECTOR
end

.newObject



16
17
18
# File 'lib/legion/transport/connection.rb', line 16

def new
  clone
end

.reconnect(connection_name: 'Legion') ⇒ Object



24
25
26
27
28
# File 'lib/legion/transport/connection.rb', line 24

def reconnect(connection_name: 'Legion', **)
  @session = nil
  @channel_thread = Concurrent::ThreadLocalVar.new(nil)
  setup(connection_name: connection_name)
end

.sessionObject



65
66
67
68
# File 'lib/legion/transport/connection.rb', line 65

def session
  nil if @session.nil?
  @session.value
end

.session_open?Boolean

Returns:

  • (Boolean)


80
81
82
83
84
# File 'lib/legion/transport/connection.rb', line 80

def session_open?
  session.open?
rescue StandardError
  false
end

.settingsObject



12
13
14
# File 'lib/legion/transport/connection.rb', line 12

def settings
  Legion::Settings[:transport]
end

.setup(connection_name: 'Legion') ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/legion/transport/connection.rb', line 30

def setup(connection_name: 'Legion', **)
  Legion::Transport.logger.info("Using transport connector: #{Legion::Transport::CONNECTOR}")

  if @session.respond_to?(:value) && session.respond_to?(:closed?) && session.closed?
    @channel_thread = Concurrent::ThreadLocalVar.new(nil)
  elsif @session.respond_to?(:value) && session.respond_to?(:closed?) && session.open?
    nil
  else
    @session ||= Concurrent::AtomicReference.new(
      create_session_with_failover(connection_name: connection_name)
    )
    @channel_thread = Concurrent::ThreadLocalVar.new(nil)
    session.start
    session.create_channel(nil, settings[:channel][:session_worker_pool_size])
           .basic_qos(settings[:prefetch], true)
    Legion::Settings[:transport][:connected] = true
    host = settings.dig(:connection, :host) || '127.0.0.1'
    port = settings.dig(:connection, :port) || 5672
    Legion::Logging.info "Connected to amqp://#{host}:#{port}" if defined?(Legion::Logging)
  end

  register_session_callbacks
  apply_quorum_policy_if_enabled
  true
end

.shutdownObject



86
87
88
89
90
# File 'lib/legion/transport/connection.rb', line 86

def shutdown
  Legion::Logging.info 'Transport connection shutting down' if defined?(Legion::Logging)
  session.close
  @session = nil
end