Class: RSMP::Supervisor
- Inherits:
-
Node
- Object
- Node
- RSMP::Supervisor
show all
- Defined in:
- lib/rsmp/supervisor.rb
Instance Attribute Summary collapse
Attributes inherited from Node
#archive, #clock, #collector, #deferred, #error_queue, #task
Attributes included from Task
#task
Attributes included from Logging
#archive
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Node
#author, #check_required_settings, #clear_deferred, #defer, #distribute_error, #do_deferred, #ignore_errors, #now, #process_deferred, #stop_subtasks
Methods included from Task
#initialize_task, #restart, #start, #stop_subtasks, #stop_task, #task_status, #wait, #wait_for_condition
Methods included from Inspect
#inspect, #inspector
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options = {}) ⇒ Supervisor
Returns a new instance of Supervisor.
9
10
11
12
13
14
|
# File 'lib/rsmp/supervisor.rb', line 9
def initialize options={}
handle_supervisor_settings( options[:supervisor_settings] || {} )
super options
@proxies = []
@site_id_condition = Async::Notification.new
end
|
Instance Attribute Details
#core_version ⇒ Object
Returns the value of attribute core_version.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def core_version
@core_version
end
|
#logger ⇒ Object
Returns the value of attribute logger.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def logger
@logger
end
|
#proxies ⇒ Object
Returns the value of attribute proxies.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def proxies
@proxies
end
|
#site_id ⇒ Object
Returns the value of attribute site_id.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def site_id
@site_id
end
|
#supervisor_settings ⇒ Object
Returns the value of attribute supervisor_settings.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def supervisor_settings
@supervisor_settings
end
|
Class Method Details
.build_id_from_ip_port(ip, port) ⇒ Object
282
283
284
|
# File 'lib/rsmp/supervisor.rb', line 282
def self.build_id_from_ip_port ip, port
Digest::MD5.hexdigest("#{ip}:#{port}")[0..8]
end
|
Instance Method Details
#accept?(socket, info) ⇒ Boolean
106
107
108
|
# File 'lib/rsmp/supervisor.rb', line 106
def accept? socket, info
true
end
|
#accept_connection(socket, info) ⇒ Object
accept an incoming connecting by creating and starting a proxy
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
|
# File 'lib/rsmp/supervisor.rb', line 144
def accept_connection socket, info
log "Site connected from #{format_ip_and_port(info)}",
ip: info[:ip],
port: info[:port],
level: :info,
timestamp: Clock.now
authorize_ip info[:ip]
stream = Async::IO::Stream.new socket
protocol = Async::IO::Protocol::Line.new stream, Proxy::WRAPPING_DELIMITER
settings = {
supervisor: self,
ip: info[:ip],
port: info[:port],
task: @task,
collect: @collect,
socket: socket,
stream: stream,
protocol: protocol,
info: info,
logger: @logger,
archive: @archive
}
version_message = peek_version_message protocol
id = version_message.attribute('siteId').first['sId']
proxy = find_site id
if proxy
if proxy.connected?
raise ConnectionError.new("Site #{id} alredy connected from port #{proxy.port}")
else
proxy.revive settings
end
else
check_max_sites
proxy = build_proxy settings.merge(site_id:id) @proxies.push proxy
end
proxy.setup_site_settings
proxy.check_core_version version_message
log "Validating using core version #{proxy.core_version}", level: :debug
proxy.start proxy.wait
ensure
site_ids_changed
stop if @supervisor_settings['one_shot']
end
|
#aggregated_status_changed(site_proxy, component) ⇒ Object
279
280
|
# File 'lib/rsmp/supervisor.rb', line 279
def aggregated_status_changed site_proxy, component
end
|
#authorize_ip(ip) ⇒ Object
122
123
124
125
126
|
# File 'lib/rsmp/supervisor.rb', line 122
def authorize_ip ip
return if @supervisor_settings['ips'] == 'all'
return if @supervisor_settings['ips'].include? ip
raise ConnectionError.new('guest ip not allowed')
end
|
#build_proxy(settings) ⇒ Object
110
111
112
|
# File 'lib/rsmp/supervisor.rb', line 110
def build_proxy settings
SiteProxy.new settings
end
|
#check_max_sites ⇒ Object
128
129
130
131
132
133
134
135
|
# File 'lib/rsmp/supervisor.rb', line 128
def check_max_sites
max = @supervisor_settings['max_sites']
if max
if @proxies.size >= max
raise ConnectionError.new("maximum of #{max} sites already connected")
end
end
end
|
#check_site_already_connected(site_id) ⇒ Object
260
261
262
263
|
# File 'lib/rsmp/supervisor.rb', line 260
def check_site_already_connected site_id
site = find_site(site_id)
raise HandshakeError.new "Site '#{site_id}' already connected" if site != nil && site != self
end
|
#check_site_id(site_id) ⇒ Object
255
256
257
258
|
# File 'lib/rsmp/supervisor.rb', line 255
def check_site_id site_id
return site_id_to_site_setting site_id
end
|
#check_site_sxl_types ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/rsmp/supervisor.rb', line 43
def check_site_sxl_types
sites = @supervisor_settings['sites'].clone || {}
sites['guest'] = @supervisor_settings['guest']
sites.each do |site_id,settings|
unless settings
raise RSMP::ConfigurationError.new("Configuration for site '#{site_id}' is empty")
end
sxl = settings['sxl']
unless sxl
raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': No SXL specified")
end
RSMP::Schema.find_schemas! sxl if sxl
rescue RSMP::Schema::UnknownSchemaError => e
raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': #{e}")
end
end
|
#close(socket, info) ⇒ Object
205
206
207
208
209
210
211
212
213
|
# File 'lib/rsmp/supervisor.rb', line 205
def close socket, info
if info
log "Connection to #{format_ip_and_port(info)} closed", ip: info[:ip], level: :info, timestamp: Clock.now
else
log "Connection closed", level: :info, timestamp: Clock.now
end
socket.close
end
|
#find_site(site_id) ⇒ Object
226
227
228
229
230
231
|
# File 'lib/rsmp/supervisor.rb', line 226
def find_site site_id
@proxies.each do |site|
return site if site_id == :any || site.site_id == site_id
end
nil
end
|
#find_site_from_ip_port(ip, port) ⇒ Object
219
220
221
222
223
224
|
# File 'lib/rsmp/supervisor.rb', line 219
def find_site_from_ip_port ip, port
@proxies.each do |site|
return site if site.ip == ip && site.port == port
end
nil
end
|
114
115
116
117
118
119
120
|
# File 'lib/rsmp/supervisor.rb', line 114
def format_ip_and_port info
if @logger.settings['hide_ip_and_port']
'********'
else
"#{info[:ip]}:#{info[:port]}"
end
end
|
#handle_connection(socket) ⇒ Object
handle an incoming connction by either accepting of rejecting it
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/rsmp/supervisor.rb', line 85
def handle_connection socket
remote_port = socket.remote_address.ip_port
remote_hostname = socket.remote_address.ip_address
remote_ip = socket.remote_address.ip_address
info = {ip:remote_ip, port:remote_port, hostname:remote_hostname, now:Clock.now}
if accept? socket, info
accept_connection socket, info
else
reject_connection socket, info
end
rescue ConnectionError, HandshakeError => e
log "Rejected connection from #{remote_ip}:#{remote_port}, #{e.to_s}", level: :warning
distribute_error e
rescue StandardError => e
log "Connection: #{e.to_s}", exception: e, level: :error
distribute_error e, level: :internal
ensure
close socket, info
end
|
#handle_supervisor_settings(supervisor_settings) ⇒ Object
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/rsmp/supervisor.rb', line 20
def handle_supervisor_settings supervisor_settings
defaults = {
'port' => 12111,
'ips' => 'all',
'guest' => {
'sxl' => 'tlc',
'intervals' => {
'timer' => 1,
'watchdog' => 1
},
'timeouts' => {
'watchdog' => 2,
'acknowledgement' => 2
}
}
}
@supervisor_settings = defaults.deep_merge(supervisor_settings)
@core_version = @supervisor_settings["guest"]["core_version"]
check_site_sxl_types
end
|
#ip_to_site_settings(ip) ⇒ Object
275
276
277
|
# File 'lib/rsmp/supervisor.rb', line 275
def ip_to_site_settings ip
@supervisor_settings['sites'][ip] || @supervisor_settings['sites']['guest']
end
|
#peek_version_message(protocol) ⇒ Object
137
138
139
140
141
|
# File 'lib/rsmp/supervisor.rb', line 137
def peek_version_message protocol
json = protocol.peek_line
attributes = Message.parse_attributes json
Message.build attributes, json
end
|
#reject_connection(socket, info) ⇒ Object
201
202
203
|
# File 'lib/rsmp/supervisor.rb', line 201
def reject_connection socket, info
log "Site rejected", ip: info[:ip], level: :info
end
|
#run ⇒ Object
listen for connections Async::IO::Endpoint#accept createa an async task that we will wait for
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/rsmp/supervisor.rb', line 62
def run
log "Starting supervisor on port #{@supervisor_settings["port"]}",
level: :info,
timestamp: @clock.now
@endpoint = Async::IO::Endpoint.tcp('0.0.0.0', @supervisor_settings["port"])
tasks = @endpoint.accept do |socket| handle_connection(socket)
rescue StandardError => e
distribute_error e, level: :internal
end
tasks.each { |task| task.wait }
rescue StandardError => e
distribute_error e, level: :internal
end
|
#site_connected?(site_id) ⇒ Boolean
215
216
217
|
# File 'lib/rsmp/supervisor.rb', line 215
def site_connected? site_id
return find_site(site_id) != nil
end
|
#site_id_to_site_setting(site_id) ⇒ Object
265
266
267
268
269
270
271
272
273
|
# File 'lib/rsmp/supervisor.rb', line 265
def site_id_to_site_setting site_id
return {} unless @supervisor_settings['sites']
@supervisor_settings['sites'].each_pair do |id,settings|
if id == 'guest' || id == site_id
return settings
end
end
raise HandshakeError.new "site id #{site_id} unknown"
end
|
#site_ids_changed ⇒ Object
197
198
199
|
# File 'lib/rsmp/supervisor.rb', line 197
def site_ids_changed
@site_id_condition.signal
end
|
#stop ⇒ Object
79
80
81
82
|
# File 'lib/rsmp/supervisor.rb', line 79
def stop
log "Stopping supervisor #{@supervisor_settings["site_id"]}", level: :info
super
end
|
#wait_for_site(site_id, timeout:) ⇒ Object
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
|
# File 'lib/rsmp/supervisor.rb', line 233
def wait_for_site site_id, timeout:
site = find_site site_id
return site if site
wait_for_condition(@site_id_condition,timeout:timeout) do
find_site site_id
end
rescue Async::TimeoutError
if site_id == :any
str = "No site connected"
else
str = "Site '#{site_id}' did not connect"
end
raise RSMP::TimeoutError.new "#{str} within #{timeout}s"
end
|
#wait_for_site_disconnect(site_id, timeout:) ⇒ Object
249
250
251
252
253
|
# File 'lib/rsmp/supervisor.rb', line 249
def wait_for_site_disconnect site_id, timeout:
wait_for_condition(@site_id_condition,timeout:timeout) { true unless find_site site_id }
rescue Async::TimeoutError
raise RSMP::TimeoutError.new "Site '#{site_id}' did not disconnect within #{timeout}s"
end
|