Module: Legion::Transport::Connection
- Defined in:
- lib/legion/transport/connection.rb,
lib/legion/transport/connection/ssl.rb,
lib/legion/transport/connection/vault.rb
Defined Under Namespace
Constant Summary collapse
- RECOVERY_WINDOW =
60- MAX_RECOVERIES_PER_WINDOW =
5- MAX_PUBLISHER_CHANNELS =
128
Class Method Summary collapse
- .build_channel ⇒ Object
- .build_session_open? ⇒ Boolean
- .channel ⇒ Object
- .channel_open? ⇒ Boolean
- .channel_registry_size ⇒ Object
- .channel_thread ⇒ Object
- .close_build_session ⇒ Object
- .connector ⇒ Object
- .create_dedicated_session(name: 'legion-dedicated') ⇒ Object
- .force_reconnect(connection_name: 'Legion') ⇒ Object
- .lite_mode? ⇒ Boolean
- .log_channel ⇒ Object
- .new ⇒ Object
- .on_force_reconnect(&block) ⇒ Object
- .open_build_session(connection_name: 'Legion::Build') ⇒ Object
- .reconnect(connection_name: 'Legion') ⇒ Object
- .session ⇒ Object
- .session_open? ⇒ Boolean
- .settings ⇒ Object
- .setup(connection_name: 'Legion') ⇒ Object
- .shutdown ⇒ Object
Methods included from Vault
vault_pki_enabled?, vault_pki_tls_options
Methods included from SSL
Class Method Details
.build_channel ⇒ Object
213 214 215 216 217 218 219 220 221 222 |
# File 'lib/legion/transport/connection.rb', line 213 def build_channel return channel unless @build_session return @build_channel_thread.value if @build_channel_thread.value&.open? @build_channel_thread.value = @build_session.value.create_channel( nil, settings[:channel][:default_worker_pool_size], false, 10 ) @build_channel_thread.value.prefetch(settings[:prefetch]) @build_channel_thread.value end |
.build_session_open? ⇒ Boolean
245 246 247 |
# File 'lib/legion/transport/connection.rb', line 245 def build_session_open? @build_session&.value&.open? == true end |
.channel ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/legion/transport/connection.rb', line 63 def channel # Build threads route to build session return build_channel if Thread.current[:legion_build_session] && @build_session if @pool sess = @pool.checkout begin start_session(sess) ch = sess.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10) ch.prefetch(settings[:prefetch]) return ch rescue StandardError => e safe_close_channel(ch) handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.channel', pooled: true) raise ensure @pool.checkin(sess) if sess end end return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open? s = session raise IOError, 'transport session unavailable (recovery in progress)' unless s&.open? sweep_dead_thread_channels current_size = channel_registry_size if current_size >= MAX_PUBLISHER_CHANNELS log.warn "Channel registry at capacity (size=#{current_size}, max=#{MAX_PUBLISHER_CHANNELS}); " \ 'RabbitMQ channel_max exhaustion risk — investigate thread lifecycle' end @channel_thread.value = s.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10) @channel_thread.value.prefetch(settings[:prefetch]) track_channel(Thread.current, @channel_thread.value) log.debug "Channel created for thread #{Thread.current.object_id} (tracked=#{channel_registry_size})" @channel_thread.value end |
.channel_open? ⇒ Boolean
113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/legion/transport/connection.rb', line 113 def channel_open? # In pool mode, channels are not cached in @channel_thread; check the primary session instead. return session_open? if @pool current_channel = @channel_thread&.value return false unless current_channel current_channel.open? rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.channel_open?') false end |
.channel_registry_size ⇒ Object
289 290 291 |
# File 'lib/legion/transport/connection.rb', line 289 def channel_registry_size (@channel_registry ||= Concurrent::Hash.new).size end |
.channel_thread ⇒ Object
109 110 111 |
# File 'lib/legion/transport/connection.rb', line 109 def channel_thread channel end |
.close_build_session ⇒ Object
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/legion/transport/connection.rb', line 224 def close_build_session return unless @build_session s = @build_session.value Timeout.timeout(10) { s.close } if s&.open? @build_session = nil @build_channel_thread = nil log.info 'Build session closed (all build channels released)' rescue Timeout::Error => e handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.close_build_session') bs = @build_session&.value safely_close_build_transport(bs) @build_session = nil @build_channel_thread = nil rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.close_build_session') @build_session = nil @build_channel_thread = nil end |
.connector ⇒ Object
33 34 35 |
# File 'lib/legion/transport/connection.rb', line 33 def connector Legion::Transport::CONNECTOR end |
.create_dedicated_session(name: 'legion-dedicated') ⇒ Object
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/legion/transport/connection.rb', line 264 def create_dedicated_session(name: 'legion-dedicated') if lite_mode? # In-process transport is process-global; return the shared session so # that callers do not inadvertently reset all queues via Session#close. # Use an AtomicReference + compare_and_set so concurrent callers cannot # each create a separate InProcess::Session (whose #close calls Local.reset!). ref = (@session ||= Concurrent::AtomicReference.new(nil)) loop do shared = ref.value return shared if shared&.open? s = Legion::Transport::InProcess::Session.new s.start # Install this session only if no other thread has won the race. return s if ref.compare_and_set(shared, s) # Another thread installed a session first; discard this one and retry. end end sess = create_session_with_failover(connection_name: name) sess.start sess end |
.force_reconnect(connection_name: 'Legion') ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/legion/transport/connection.rb', line 169 def force_reconnect(connection_name: 'Legion') return if @shutting_down return unless begin_reconnect log.warn('Force reconnecting: pathological recovery loop detected') old = session pool_mode = !@pool.nil? reset_pool if pool_mode @session = nil @channel_thread = Concurrent::ThreadLocalVar.new(nil) @channel_registry = Concurrent::Hash.new @recovery_timestamps = [] tear_down_session(old) if old && !pool_mode setup(connection_name: connection_name) Array(@reconnect_callbacks).each do |cb| cb.call rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.reconnect_callback') end rescue StandardError => e handle_exception(e, level: :error, handled: true, operation: 'transport.connection.force_reconnect') ensure clear_reconnect_state end |
.lite_mode? ⇒ Boolean
21 22 23 |
# File 'lib/legion/transport/connection.rb', line 21 def lite_mode? Legion::Transport::TYPE == 'local' end |
.log_channel ⇒ Object
249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/legion/transport/connection.rb', line 249 def log_channel return nil if lite_mode? return @log_channel if @log_channel&.open? if session&.open? safely_close_log_channel @log_channel = session.create_channel @log_channel.prefetch(1) @log_channel end rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.log_channel') nil end |
.new ⇒ Object
29 30 31 |
# File 'lib/legion/transport/connection.rb', line 29 def new clone end |
.on_force_reconnect(&block) ⇒ Object
196 197 198 199 |
# File 'lib/legion/transport/connection.rb', line 196 def on_force_reconnect(&block) @reconnect_callbacks ||= [] @reconnect_callbacks << block end |
.open_build_session(connection_name: 'Legion::Build') ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/legion/transport/connection.rb', line 201 def open_build_session(connection_name: 'Legion::Build') return if lite_mode? return if @build_session @build_session = Concurrent::AtomicReference.new( create_session_with_failover(connection_name: connection_name) ) @build_session.value.start @build_channel_thread = Concurrent::ThreadLocalVar.new(nil) log.info 'Build session opened' end |
.reconnect(connection_name: 'Legion') ⇒ Object
37 38 39 40 41 42 |
# File 'lib/legion/transport/connection.rb', line 37 def reconnect(connection_name: 'Legion', **) @session = nil @channel_thread = Concurrent::ThreadLocalVar.new(nil) @channel_registry = Concurrent::Hash.new setup(connection_name: connection_name) end |
.session ⇒ Object
103 104 105 106 107 |
# File 'lib/legion/transport/connection.rb', line 103 def session return nil if @session.nil? @session.value end |
.session_open? ⇒ Boolean
126 127 128 129 130 131 132 133 134 |
# File 'lib/legion/transport/connection.rb', line 126 def session_open? current_session = session return false unless current_session current_session.open? rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.session_open?') false end |
.settings ⇒ Object
25 26 27 |
# File 'lib/legion/transport/connection.rb', line 25 def settings Legion::Settings[:transport] end |
.setup(connection_name: 'Legion') ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/legion/transport/connection.rb', line 44 def setup(connection_name: 'Legion', **) log.info("Using transport connector: #{Legion::Transport::CONNECTOR}") return setup_lite if lite_mode? pool_size = settings[:connection_pool_size].to_i if pool_size > 1 setup_pool(pool_size: pool_size, connection_name: connection_name) elsif session.respond_to?(:open?) && session.open? @channel_thread ||= Concurrent::ThreadLocalVar.new(nil) else rebuild_single_session(connection_name: connection_name) end register_session_callbacks reset_log_channel apply_quorum_policy_if_enabled true end |
.shutdown ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/legion/transport/connection.rb', line 136 def shutdown log.info 'Transport connection shutting down' @shutting_down = true pre_mark_sessions_closing close_build_session close_all_tracked_channels if @pool @pool.shutdown @pool = nil end return unless @session if lite_mode? session&.close @session = nil return end s = session return unless s tear_down_session(s) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.shutdown') ensure @log_channel = nil @session = nil @channel_registry = Concurrent::Hash.new @shutting_down = false end |