Class: RSMP::Supervisor
- Inherits:
-
Node
- Object
- Node
- RSMP::Supervisor
show all
- Defined in:
- lib/rsmp/supervisor.rb
Instance Attribute Summary collapse
Attributes inherited from Node
#archive, #clock, #collector, #deferred, #error_queue, #task
Attributes included from Task
#task
Attributes included from Logging
#archive
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Node
#author, #check_required_settings, #clear_deferred, #defer, #do_deferred, #ignore_errors, #notify_error, #process_deferred, #stop_subtasks
Methods included from Task
#initialize_task, #restart, #start, #stop_subtasks, #stop_task, #task_status, #wait, #wait_for_condition
Methods included from Inspect
#inspect, #inspector
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options = {}) ⇒ Supervisor
Returns a new instance of Supervisor.
9
10
11
12
13
14
|
# File 'lib/rsmp/supervisor.rb', line 9
def initialize options={}
handle_supervisor_settings( options[:supervisor_settings] || {} )
super options
@proxies = []
@site_id_condition = Async::Notification.new
end
|
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def logger
@logger
end
|
#proxies ⇒ Object
Returns the value of attribute proxies.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def proxies
@proxies
end
|
#rsmp_versions ⇒ Object
Returns the value of attribute rsmp_versions.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def rsmp_versions
@rsmp_versions
end
|
#site_id ⇒ Object
Returns the value of attribute site_id.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def site_id
@site_id
end
|
#supervisor_settings ⇒ Object
Returns the value of attribute supervisor_settings.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def supervisor_settings
@supervisor_settings
end
|
Class Method Details
.build_id_from_ip_port(ip, port) ⇒ Object
283
284
285
|
# File 'lib/rsmp/supervisor.rb', line 283
def self.build_id_from_ip_port ip, port
Digest::MD5.hexdigest("#{ip}:#{port}")[0..8]
end
|
Instance Method Details
#accept?(socket, info) ⇒ Boolean
107
108
109
|
# File 'lib/rsmp/supervisor.rb', line 107
def accept? socket, info
true
end
|
#accept_connection(socket, info) ⇒ Object
accept an incoming connecting by creating and starting a proxy
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/rsmp/supervisor.rb', line 145
def accept_connection socket, info
log "Site connected from #{format_ip_and_port(info)}",
ip: info[:ip],
port: info[:port],
level: :info,
timestamp: Clock.now
authorize_ip info[:ip]
stream = Async::IO::Stream.new socket
protocol = Async::IO::Protocol::Line.new stream, Proxy::WRAPPING_DELIMITER
settings = {
supervisor: self,
ip: info[:ip],
port: info[:port],
task: @task,
collect: @collect,
socket: socket,
stream: stream,
protocol: protocol,
info: info,
logger: @logger,
archive: @archive
}
version_message = peek_version_message protocol
id = version_message.attribute('siteId').first['sId']
proxy = find_site id
if proxy
if proxy.connected?
raise ConnectionError.new("Site #{id} alredy connected from port #{proxy.port}")
else
proxy.revive settings
end
else
check_max_sites
proxy = build_proxy settings.merge(site_id:id) @proxies.push proxy
end
proxy.setup_site_settings
proxy.check_rsmp_version version_message
log "Validating using core version #{proxy.rsmp_version}", level: :debug
proxy.start proxy.wait
ensure
site_ids_changed
stop if @supervisor_settings['one_shot']
end
|
#aggregated_status_changed(site_proxy, component) ⇒ Object
280
281
|
# File 'lib/rsmp/supervisor.rb', line 280
def aggregated_status_changed site_proxy, component
end
|
#authorize_ip(ip) ⇒ Object
123
124
125
126
127
|
# File 'lib/rsmp/supervisor.rb', line 123
def authorize_ip ip
return if @supervisor_settings['ips'] == 'all'
return if @supervisor_settings['ips'].include? ip
raise ConnectionError.new('guest ip not allowed')
end
|
#build_proxy(settings) ⇒ Object
111
112
113
|
# File 'lib/rsmp/supervisor.rb', line 111
def build_proxy settings
SiteProxy.new settings
end
|
#check_max_sites ⇒ Object
129
130
131
132
133
134
135
136
|
# File 'lib/rsmp/supervisor.rb', line 129
def check_max_sites
max = @supervisor_settings['max_sites']
if max
if @proxies.size >= max
raise ConnectionError.new("maximum of #{max} sites already connected")
end
end
end
|
#check_site_already_connected(site_id) ⇒ Object
261
262
263
264
|
# File 'lib/rsmp/supervisor.rb', line 261
def check_site_already_connected site_id
site = find_site(site_id)
raise HandshakeError.new "Site '#{site_id}' already connected" if site != nil && site != self
end
|
#check_site_id(site_id) ⇒ Object
256
257
258
259
|
# File 'lib/rsmp/supervisor.rb', line 256
def check_site_id site_id
return site_id_to_site_setting site_id
end
|
#check_site_sxl_types ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/rsmp/supervisor.rb', line 44
def check_site_sxl_types
sites = @supervisor_settings['sites'].clone || {}
sites['guest'] = @supervisor_settings['guest']
sites.each do |site_id,settings|
unless settings
raise RSMP::ConfigurationError.new("Configuration for site '#{site_id}' is empty")
end
sxl = settings['sxl']
unless sxl
raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': No SXL specified")
end
RSMP::Schema.find_schemas! sxl if sxl
rescue RSMP::Schema::UnknownSchemaError => e
raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': #{e}")
end
end
|
#close(socket, info) ⇒ Object
206
207
208
209
210
211
212
213
214
|
# File 'lib/rsmp/supervisor.rb', line 206
def close socket, info
if info
log "Connection to #{format_ip_and_port(info)} closed", ip: info[:ip], level: :info, timestamp: Clock.now
else
log "Connection closed", level: :info, timestamp: Clock.now
end
socket.close
end
|
#find_site(site_id) ⇒ Object
227
228
229
230
231
232
|
# File 'lib/rsmp/supervisor.rb', line 227
def find_site site_id
@proxies.each do |site|
return site if site_id == :any || site.site_id == site_id
end
nil
end
|
#find_site_from_ip_port(ip, port) ⇒ Object
220
221
222
223
224
225
|
# File 'lib/rsmp/supervisor.rb', line 220
def find_site_from_ip_port ip, port
@proxies.each do |site|
return site if site.ip == ip && site.port == port
end
nil
end
|
115
116
117
118
119
120
121
|
# File 'lib/rsmp/supervisor.rb', line 115
def format_ip_and_port info
if @logger.settings['hide_ip_and_port']
'********'
else
"#{info[:ip]}:#{info[:port]}"
end
end
|
#handle_connection(socket) ⇒ Object
handle an incoming connction by either accepting of rejecting it
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/rsmp/supervisor.rb', line 86
def handle_connection socket
remote_port = socket.remote_address.ip_port
remote_hostname = socket.remote_address.ip_address
remote_ip = socket.remote_address.ip_address
info = {ip:remote_ip, port:remote_port, hostname:remote_hostname, now:Clock.now}
if accept? socket, info
accept_connection socket, info
else
reject_connection socket, info
end
rescue ConnectionError => e
log "Rejected connection from #{remote_ip}:#{remote_port}, #{e.to_s}", level: :warning
notify_error e
rescue StandardError => e
log "Connection: #{e.to_s}", exception: e, level: :error
notify_error e, level: :internal
ensure
close socket, info
end
|
#handle_supervisor_settings(supervisor_settings) ⇒ Object
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/rsmp/supervisor.rb', line 20
def handle_supervisor_settings supervisor_settings
defaults = {
'port' => 12111,
'ips' => 'all',
'guest' => {
'rsmp_versions' => 'all',
'sxl' => 'tlc',
'intervals' => {
'timer' => 1,
'watchdog' => 1
},
'timeouts' => {
'watchdog' => 2,
'acknowledgement' => 2
}
}
}
@supervisor_settings = defaults.deep_merge(supervisor_settings)
@rsmp_versions = @supervisor_settings["guest"]["rsmp_versions"]
check_site_sxl_types
end
|
#ip_to_site_settings(ip) ⇒ Object
276
277
278
|
# File 'lib/rsmp/supervisor.rb', line 276
def ip_to_site_settings ip
@supervisor_settings['sites'][ip] || @supervisor_settings['sites']['guest']
end
|
#peek_version_message(protocol) ⇒ Object
138
139
140
141
142
|
# File 'lib/rsmp/supervisor.rb', line 138
def peek_version_message protocol
json = protocol.peek_line
attributes = Message.parse_attributes json
Message.build attributes, json
end
|
#reject_connection(socket, info) ⇒ Object
202
203
204
|
# File 'lib/rsmp/supervisor.rb', line 202
def reject_connection socket, info
log "Site rejected", ip: info[:ip], level: :info
end
|
#run ⇒ Object
listen for connections Async::IO::Endpoint#accept createa an async task that we will wait for
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/rsmp/supervisor.rb', line 63
def run
log "Starting supervisor on port #{@supervisor_settings["port"]}",
level: :info,
timestamp: @clock.now
@endpoint = Async::IO::Endpoint.tcp('0.0.0.0', @supervisor_settings["port"])
tasks = @endpoint.accept do |socket| handle_connection(socket)
rescue StandardError => e
notify_error e, level: :internal
end
tasks.each { |task| task.wait }
rescue StandardError => e
notify_error e, level: :internal
end
|
#site_connected?(site_id) ⇒ Boolean
216
217
218
|
# File 'lib/rsmp/supervisor.rb', line 216
def site_connected? site_id
return find_site(site_id) != nil
end
|
#site_id_to_site_setting(site_id) ⇒ Object
266
267
268
269
270
271
272
273
274
|
# File 'lib/rsmp/supervisor.rb', line 266
def site_id_to_site_setting site_id
return {} unless @supervisor_settings['sites']
@supervisor_settings['sites'].each_pair do |id,settings|
if id == 'guest' || id == site_id
return settings
end
end
raise HandshakeError.new "site id #{site_id} unknown"
end
|
#site_ids_changed ⇒ Object
198
199
200
|
# File 'lib/rsmp/supervisor.rb', line 198
def site_ids_changed
@site_id_condition.signal
end
|
#stop ⇒ Object
80
81
82
83
|
# File 'lib/rsmp/supervisor.rb', line 80
def stop
log "Stopping supervisor #{@supervisor_settings["site_id"]}", level: :info
super
end
|
#wait_for_site(site_id, timeout:) ⇒ Object
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
|
# File 'lib/rsmp/supervisor.rb', line 234
def wait_for_site site_id, timeout:
site = find_site site_id
return site if site
wait_for_condition(@site_id_condition,timeout:timeout) do
find_site site_id
end
rescue Async::TimeoutError
if site_id == :any
str = "No site connected"
else
str = "Site '#{site_id}' did not connect"
end
raise RSMP::TimeoutError.new "#{str} within #{timeout}s"
end
|
#wait_for_site_disconnect(site_id, timeout:) ⇒ Object
250
251
252
253
254
|
# File 'lib/rsmp/supervisor.rb', line 250
def wait_for_site_disconnect site_id, timeout:
wait_for_condition(@site_id_condition,timeout:timeout) { true unless find_site site_id }
rescue Async::TimeoutError
raise RSMP::TimeoutError.new "Site '#{site_id}' did not disconnect within #{timeout}s"
end
|