Module: Legion::Transport::Connection
- Extended by:
- Logging::Helper, SSL, Vault
- Defined in:
- lib/legion/transport/connection.rb,
lib/legion/transport/connection/ssl.rb,
lib/legion/transport/connection/vault.rb
Defined Under Namespace
Modules: SSL, Vault
Constant Summary
collapse
- RECOVERY_WINDOW =
60
- MAX_RECOVERIES_PER_WINDOW =
5
Class Method Summary
collapse
Methods included from Vault
vault_pki_enabled?, vault_pki_tls_options
Methods included from SSL
tls_options
Class Method Details
.build_channel ⇒ Object
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/legion/transport/connection.rb', line 195
def build_channel
return channel unless @build_session
return @build_channel_thread.value if @build_channel_thread.value&.open?
@build_channel_thread.value = @build_session.value.create_channel(
nil, settings[:channel][:default_worker_pool_size], false, 10
)
@build_channel_thread.value.prefetch(settings[:prefetch])
@build_channel_thread.value
end
|
.build_session_open? ⇒ Boolean
227
228
229
|
# File 'lib/legion/transport/connection.rb', line 227
def build_session_open?
@build_session&.value&.open? == true
end
|
.channel ⇒ Object
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/legion/transport/connection.rb', line 61
def channel
return build_channel if Thread.current[:legion_build_session] && @build_session
if @pool
sess = @pool.checkout
begin
start_session(sess)
ch = sess.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10)
ch.prefetch(settings[:prefetch])
return ch
rescue StandardError => e
safe_close_channel(ch)
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.channel', pooled: true)
raise
ensure
@pool.checkin(sess) if sess
end
end
return @channel_thread.value if !@channel_thread.value.nil? && @channel_thread.value.open?
@channel_thread.value = session.create_channel(nil, settings[:channel][:default_worker_pool_size], false, 10)
@channel_thread.value.prefetch(settings[:prefetch])
log.debug "Channel created for thread #{Thread.current.object_id}"
@channel_thread.value
end
|
.channel_open? ⇒ Boolean
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/legion/transport/connection.rb', line 99
def channel_open?
return session_open? if @pool
current_channel = @channel_thread&.value
return false unless current_channel
current_channel.open?
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.channel_open?')
false
end
|
.channel_thread ⇒ Object
95
96
97
|
# File 'lib/legion/transport/connection.rb', line 95
def channel_thread
channel
end
|
.close_build_session ⇒ Object
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
# File 'lib/legion/transport/connection.rb', line 206
def close_build_session
return unless @build_session
s = @build_session.value
Timeout.timeout(10) { s.close } if s&.open?
@build_session = nil
@build_channel_thread = nil
log.info 'Build session closed (all build channels released)'
rescue Timeout::Error => e
handle_exception(e, level: :warn, handled: true,
operation: 'transport.connection.close_build_session')
bs = @build_session&.value
safely_close_build_transport(bs)
@build_session = nil
@build_channel_thread = nil
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.close_build_session')
@build_session = nil
@build_channel_thread = nil
end
|
.create_dedicated_session(name: 'legion-dedicated') ⇒ Object
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
|
# File 'lib/legion/transport/connection.rb', line 246
def create_dedicated_session(name: 'legion-dedicated')
if lite_mode?
ref = (@session ||= Concurrent::AtomicReference.new(nil))
loop do
shared = ref.value
return shared if shared&.open?
s = Legion::Transport::InProcess::Session.new
s.start
return s if ref.compare_and_set(shared, s)
end
end
sess = create_session_with_failover(connection_name: name)
sess.start
sess
end
|
.force_reconnect(connection_name: 'Legion') ⇒ Object
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/legion/transport/connection.rb', line 152
def force_reconnect(connection_name: 'Legion')
return if @shutting_down
return unless begin_reconnect
log.warn('Force reconnecting: pathological recovery loop detected')
old = session
pool_mode = !@pool.nil?
reset_pool if pool_mode
@session = nil
@channel_thread = Concurrent::ThreadLocalVar.new(nil)
@recovery_timestamps = []
tear_down_session(old) if old && !pool_mode
setup(connection_name: connection_name)
Array(@reconnect_callbacks).each do |cb|
cb.call
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.reconnect_callback')
end
rescue StandardError => e
handle_exception(e, level: :error, handled: true, operation: 'transport.connection.force_reconnect')
ensure
clear_reconnect_state
end
|
.lite_mode? ⇒ Boolean
20
21
22
|
# File 'lib/legion/transport/connection.rb', line 20
def lite_mode?
Legion::Transport::TYPE == 'local'
end
|
.log_channel ⇒ Object
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
# File 'lib/legion/transport/connection.rb', line 231
def log_channel
return nil if lite_mode?
return @log_channel if @log_channel&.open?
if session&.open?
safely_close_log_channel
@log_channel = session.create_channel
@log_channel.prefetch(1)
@log_channel
end
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.log_channel')
nil
end
|
.new ⇒ Object
28
29
30
|
# File 'lib/legion/transport/connection.rb', line 28
def new
clone
end
|
.on_force_reconnect(&block) ⇒ Object
178
179
180
181
|
# File 'lib/legion/transport/connection.rb', line 178
def on_force_reconnect(&block)
@reconnect_callbacks ||= []
@reconnect_callbacks << block
end
|
.open_build_session(connection_name: 'Legion::Build') ⇒ Object
183
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/legion/transport/connection.rb', line 183
def open_build_session(connection_name: 'Legion::Build')
return if lite_mode?
return if @build_session
@build_session = Concurrent::AtomicReference.new(
create_session_with_failover(connection_name: connection_name)
)
@build_session.value.start
@build_channel_thread = Concurrent::ThreadLocalVar.new(nil)
log.info 'Build session opened'
end
|
.reconnect(connection_name: 'Legion') ⇒ Object
36
37
38
39
40
|
# File 'lib/legion/transport/connection.rb', line 36
def reconnect(connection_name: 'Legion', **)
@session = nil
@channel_thread = Concurrent::ThreadLocalVar.new(nil)
setup(connection_name: connection_name)
end
|
.session ⇒ Object
89
90
91
92
93
|
# File 'lib/legion/transport/connection.rb', line 89
def session
return nil if @session.nil?
@session.value
end
|
.session_open? ⇒ Boolean
112
113
114
115
116
117
118
119
120
|
# File 'lib/legion/transport/connection.rb', line 112
def session_open?
current_session = session
return false unless current_session
current_session.open?
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.session_open?')
false
end
|
.settings ⇒ Object
24
25
26
|
# File 'lib/legion/transport/connection.rb', line 24
def settings
Legion::Settings[:transport]
end
|
.setup(connection_name: 'Legion') ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/legion/transport/connection.rb', line 42
def setup(connection_name: 'Legion', **)
log.info("Using transport connector: #{Legion::Transport::CONNECTOR}")
return setup_lite if lite_mode?
pool_size = settings[:connection_pool_size].to_i
if pool_size > 1
setup_pool(pool_size: pool_size, connection_name: connection_name)
elsif session.respond_to?(:open?) && session.open?
@channel_thread ||= Concurrent::ThreadLocalVar.new(nil)
else
rebuild_single_session(connection_name: connection_name)
end
register_session_callbacks
reset_log_channel
apply_quorum_policy_if_enabled
true
end
|
.shutdown ⇒ Object
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# File 'lib/legion/transport/connection.rb', line 122
def shutdown
log.info 'Transport connection shutting down'
@shutting_down = true
close_build_session
if @pool
@pool.shutdown
@pool = nil
end
return unless @session
if lite_mode?
session&.close
@session = nil
return
end
s = session
return unless s
tear_down_session(s)
rescue StandardError => e
handle_exception(e, level: :warn, handled: true, operation: 'transport.connection.shutdown')
ensure
@log_channel = nil
@session = nil
@shutting_down = false
end
|