Module: Legion::Transport::Connection

Defined in:
lib/legion/transport/connection.rb,
lib/legion/transport/connection/ssl.rb,
lib/legion/transport/connection/vault.rb

Defined Under Namespace

Modules: SSL, Vault

Class Method Summary collapse

Class Method Details

.channelObject



62
63
64
65
66
67
68
# File 'lib/legion/transport/connection.rb', line 62

def channel
  return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open?

  @channel_thread.value = session.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10)
  @channel_thread.value.prefetch(settings[:prefetch])
  @channel_thread.value
end

.channel_open?Boolean

Returns:

  • (Boolean)


79
80
81
82
83
# File 'lib/legion/transport/connection.rb', line 79

def channel_open?
  channel.open?
rescue StandardError
  false
end

.channel_threadObject



75
76
77
# File 'lib/legion/transport/connection.rb', line 75

def channel_thread
  channel
end

.connectorObject



17
18
19
# File 'lib/legion/transport/connection.rb', line 17

def connector
  Legion::Transport::CONNECTOR
end

.newObject



13
14
15
# File 'lib/legion/transport/connection.rb', line 13

def new
  clone
end

.reconnect(connection_name: 'Legion') ⇒ Object



21
22
23
24
25
# File 'lib/legion/transport/connection.rb', line 21

def reconnect(connection_name: 'Legion', **)
  @session = nil
  @channel_thread = Concurrent::ThreadLocalVar.new(nil)
  setup(connection_name: connection_name)
end

.sessionObject



70
71
72
73
# File 'lib/legion/transport/connection.rb', line 70

def session
  nil if @session.nil?
  @session.value
end

.session_open?Boolean

Returns:

  • (Boolean)


85
86
87
88
89
# File 'lib/legion/transport/connection.rb', line 85

def session_open?
  session.open?
rescue StandardError
  false
end

.settingsObject



9
10
11
# File 'lib/legion/transport/connection.rb', line 9

def settings
  Legion::Settings[:transport]
end

.setup(connection_name: 'Legion') ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/legion/transport/connection.rb', line 27

def setup(connection_name: 'Legion', **)
  Legion::Transport.logger.info("Using transport connector: #{Legion::Transport::CONNECTOR}")

  if @session.respond_to?(:value) && session.respond_to?(:closed?) && session.closed?
    @channel_thread = Concurrent::ThreadLocalVar.new(nil)
  elsif @session.respond_to?(:value) && session.respond_to?(:closed?) && session.open?
    nil
  else
    @session ||= Concurrent::AtomicReference.new(
      connector.new(build_bunny_opts(connection_name: connection_name))
    )
    @channel_thread = Concurrent::ThreadLocalVar.new(nil)
    session.start
    session.create_channel(nil, settings[:channel][:session_worker_pool_size])
           .basic_qos(settings[:prefetch], true)
    Legion::Settings[:transport][:connected] = true
  end

  session.on_blocked { Legion::Transport.logger.warn('Legion::Transport is being blocked by RabbitMQ!') } if session.respond_to? :on_blocked

  if session.respond_to? :on_unblocked
    session.on_unblocked do
      Legion::Transport.logger.info('Legion::Transport is no longer being blocked by RabbitMQ')
    end
  end

  if session.respond_to? :after_recovery_completed
    session.after_recovery_completed do
      Legion::Transport.logger.info('Legion::Transport has completed recovery')
    end
  end

  true
end

.shutdownObject



91
92
93
94
# File 'lib/legion/transport/connection.rb', line 91

def shutdown
  session.close
  @session = nil
end