Module: Legion::Transport::Connection
- Extended by:
- Logging::Helper, SSL, Vault
- Defined in:
- lib/legion/transport/connection.rb,
lib/legion/transport/connection/ssl.rb,
lib/legion/transport/connection/vault.rb
Defined Under Namespace
Modules: SSL, Vault
Constant Summary
collapse
- RECOVERY_WINDOW =
60
- MAX_RECOVERIES_PER_WINDOW =
5
Class Method Summary
collapse
Methods included from Vault
vault_pki_enabled?, vault_pki_tls_options
Methods included from SSL
tls_options
Class Method Details
.build_channel ⇒ Object
199
200
201
202
203
204
205
206
207
208
|
# File 'lib/legion/transport/connection.rb', line 199
def build_channel
return channel unless @build_session
return @build_channel_thread.value if @build_channel_thread.value&.open?
@build_channel_thread.value = @build_session.value.create_channel(
nil, settings[:channel][:default_worker_pool_size], false, 10
)
@build_channel_thread.value.prefetch(settings[:prefetch])
@build_channel_thread.value
end
|
.build_session_open? ⇒ Boolean
231
232
233
|
# File 'lib/legion/transport/connection.rb', line 231
def build_session_open?
@build_session&.value&.open? == true
end
|
.channel ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/legion/transport/connection.rb', line 61
def channel
return build_channel if Thread.current[:legion_build_session] && @build_session
if @pool
sess = @pool.checkout
begin
start_session(sess)
ch = sess.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10)
ch.prefetch(settings[:prefetch])
return ch
rescue StandardError => e
safe_close_channel(ch)
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.channel', pooled: true)
raise
ensure
@pool.checkin(sess) if sess
end
end
return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open?
s = session
raise IOError, 'transport session unavailable (recovery in progress)' unless s&.open?
@channel_thread.value = s.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10)
@channel_thread.value.prefetch(settings[:prefetch])
log.debug "Channel created for thread #{Thread.current.object_id}"
@channel_thread.value
end
|
.channel_open? ⇒ Boolean
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/legion/transport/connection.rb', line 102
def channel_open?
return session_open? if @pool
current_channel = @channel_thread&.value
return false unless current_channel
current_channel.open?
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.channel_open?')
false
end
|
.channel_thread ⇒ Object
98
99
100
|
# File 'lib/legion/transport/connection.rb', line 98
def channel_thread
channel
end
|
.close_build_session ⇒ Object
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
# File 'lib/legion/transport/connection.rb', line 210
def close_build_session
return unless @build_session
s = @build_session.value
Timeout.timeout(10) { s.close } if s&.open?
@build_session = nil
@build_channel_thread = nil
log.info 'Build session closed (all build channels released)'
rescue Timeout::Error => e
handle_exception(e, level: :warn, handled: true,
operation: 'transport.connection.close_build_session')
bs = @build_session&.value
safely_close_build_transport(bs)
@build_session = nil
@build_channel_thread = nil
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.close_build_session')
@build_session = nil
@build_channel_thread = nil
end
|
.create_dedicated_session(name: 'legion-dedicated') ⇒ Object
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
# File 'lib/legion/transport/connection.rb', line 250
def create_dedicated_session(name: 'legion-dedicated')
if lite_mode?
ref = (@session ||= Concurrent::AtomicReference.new(nil))
loop do
shared = ref.value
return shared if shared&.open?
s = Legion::Transport::InProcess::Session.new
s.start
return s if ref.compare_and_set(shared, s)
end
end
sess = create_session_with_failover(connection_name: name)
sess.start
sess
end
|
.force_reconnect(connection_name: 'Legion') ⇒ Object
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
# File 'lib/legion/transport/connection.rb', line 156
def force_reconnect(connection_name: 'Legion')
return if @shutting_down
return unless begin_reconnect
log.warn('Force reconnecting: pathological recovery loop detected')
old = session
pool_mode = !@pool.nil?
reset_pool if pool_mode
@session = nil
@channel_thread = Concurrent::ThreadLocalVar.new(nil)
@recovery_timestamps = []
tear_down_session(old) if old && !pool_mode
setup(connection_name: connection_name)
Array(@reconnect_callbacks).each do |cb|
cb.call
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.reconnect_callback')
end
rescue StandardError => e
handle_exception(e, level: :error, handled: true, operation: 'transport.connection.force_reconnect')
ensure
clear_reconnect_state
end
|
.lite_mode? ⇒ Boolean
20
21
22
|
# File 'lib/legion/transport/connection.rb', line 20
def lite_mode?
Legion::Transport::TYPE == 'local'
end
|
.log_channel ⇒ Object
235
236
237
238
239
240
241
242
243
244
245
246
247
248
|
# File 'lib/legion/transport/connection.rb', line 235
def log_channel
return nil if lite_mode?
return @log_channel if @log_channel&.open?
if session&.open?
safely_close_log_channel
@log_channel = session.create_channel
@log_channel.prefetch(1)
@log_channel
end
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.log_channel')
nil
end
|
.new ⇒ Object
28
29
30
|
# File 'lib/legion/transport/connection.rb', line 28
def new
clone
end
|
.on_force_reconnect(&block) ⇒ Object
182
183
184
185
|
# File 'lib/legion/transport/connection.rb', line 182
def on_force_reconnect(&block)
@reconnect_callbacks ||= []
@reconnect_callbacks << block
end
|
.open_build_session(connection_name: 'Legion::Build') ⇒ Object
187
188
189
190
191
192
193
194
195
196
197
|
# File 'lib/legion/transport/connection.rb', line 187
def open_build_session(connection_name: 'Legion::Build')
return if lite_mode?
return if @build_session
@build_session = Concurrent::AtomicReference.new(
create_session_with_failover(connection_name: connection_name)
)
@build_session.value.start
@build_channel_thread = Concurrent::ThreadLocalVar.new(nil)
log.info 'Build session opened'
end
|
.reconnect(connection_name: 'Legion') ⇒ Object
36
37
38
39
40
|
# File 'lib/legion/transport/connection.rb', line 36
def reconnect(connection_name: 'Legion', **)
@session = nil
@channel_thread = Concurrent::ThreadLocalVar.new(nil)
setup(connection_name: connection_name)
end
|
.session ⇒ Object
92
93
94
95
96
|
# File 'lib/legion/transport/connection.rb', line 92
def session
return nil if @session.nil?
@session.value
end
|
.session_open? ⇒ Boolean
115
116
117
118
119
120
121
122
123
|
# File 'lib/legion/transport/connection.rb', line 115
def session_open?
current_session = session
return false unless current_session
current_session.open?
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.session_open?')
false
end
|
.settings ⇒ Object
24
25
26
|
# File 'lib/legion/transport/connection.rb', line 24
def settings
Legion::Settings[:transport]
end
|
.setup(connection_name: 'Legion') ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/legion/transport/connection.rb', line 42
def setup(connection_name: 'Legion', **)
log.info("Using transport connector: #{Legion::Transport::CONNECTOR}")
return setup_lite if lite_mode?
pool_size = settings[:connection_pool_size].to_i
if pool_size > 1
setup_pool(pool_size: pool_size, connection_name: connection_name)
elsif session.respond_to?(:open?) && session.open?
@channel_thread ||= Concurrent::ThreadLocalVar.new(nil)
else
rebuild_single_session(connection_name: connection_name)
end
register_session_callbacks
reset_log_channel
apply_quorum_policy_if_enabled
true
end
|
.shutdown ⇒ Object
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
# File 'lib/legion/transport/connection.rb', line 125
def shutdown
log.info 'Transport connection shutting down'
@shutting_down = true
pre_mark_sessions_closing
close_build_session
if @pool
@pool.shutdown
@pool = nil
end
return unless @session
if lite_mode?
session&.close
@session = nil
return
end
s = session
return unless s
tear_down_session(s)
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.shutdown')
ensure
@log_channel = nil
@session = nil
@shutting_down = false
end
|