Class: RSMP::Supervisor
- Inherits:
-
Node
- Object
- Node
- RSMP::Supervisor
show all
- Defined in:
- lib/rsmp/supervisor.rb
Instance Attribute Summary collapse
Attributes inherited from Node
#archive, #clock, #collector, #deferred, #error_queue, #task
Attributes included from Logging
#archive
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Node
#author, #check_required_settings, #clear_deferred, #defer, #do_deferred, #do_start, #exiting, #idle, #ignore_errors, #notify_error, #process_deferred, #restart, #start
Methods included from Inspect
#inspect, #inspector
Methods included from Wait
#wait_for
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options = {}) ⇒ Supervisor
Returns a new instance of Supervisor.
9
10
11
12
13
14
|
# File 'lib/rsmp/supervisor.rb', line 9
def initialize options={}
handle_supervisor_settings( options[:supervisor_settings] || {} )
super options
@proxies = []
@site_id_condition = Async::Notification.new
end
|
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def logger
@logger
end
|
#proxies ⇒ Object
Returns the value of attribute proxies.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def proxies
@proxies
end
|
#rsmp_versions ⇒ Object
Returns the value of attribute rsmp_versions.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def rsmp_versions
@rsmp_versions
end
|
#site_id ⇒ Object
Returns the value of attribute site_id.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def site_id
@site_id
end
|
#supervisor_settings ⇒ Object
Returns the value of attribute supervisor_settings.
7
8
9
|
# File 'lib/rsmp/supervisor.rb', line 7
def supervisor_settings
@supervisor_settings
end
|
Class Method Details
.build_id_from_ip_port(ip, port) ⇒ Object
273
274
275
|
# File 'lib/rsmp/supervisor.rb', line 273
def self.build_id_from_ip_port ip, port
Digest::MD5.hexdigest("#{ip}:#{port}")[0..8]
end
|
Instance Method Details
#accept?(socket, info) ⇒ Boolean
108
109
110
|
# File 'lib/rsmp/supervisor.rb', line 108
def accept? socket, info
true
end
|
#aggregated_status_changed(site_proxy, component) ⇒ Object
270
271
|
# File 'lib/rsmp/supervisor.rb', line 270
def aggregated_status_changed site_proxy, component
end
|
#authorize_ip(ip) ⇒ Object
124
125
126
127
128
|
# File 'lib/rsmp/supervisor.rb', line 124
def authorize_ip ip
return if @supervisor_settings['ips'] == 'all'
return if @supervisor_settings['ips'].include? ip
raise ConnectionError.new('guest ip not allowed')
end
|
#build_proxy(settings) ⇒ Object
112
113
114
|
# File 'lib/rsmp/supervisor.rb', line 112
def build_proxy settings
SiteProxy.new settings
end
|
#check_max_sites ⇒ Object
130
131
132
133
134
135
136
137
|
# File 'lib/rsmp/supervisor.rb', line 130
def check_max_sites
max = @supervisor_settings['max_sites']
if max
if @proxies.size >= max
raise ConnectionError.new("maximum of #{max} sites already connected")
end
end
end
|
#check_site_already_connected(site_id) ⇒ Object
251
252
253
254
|
# File 'lib/rsmp/supervisor.rb', line 251
def check_site_already_connected site_id
site = find_site(site_id)
raise HandshakeError.new "Site '#{site_id}' already connected" if site != nil && site != self
end
|
#check_site_id(site_id) ⇒ Object
246
247
248
249
|
# File 'lib/rsmp/supervisor.rb', line 246
def check_site_id site_id
return site_id_to_site_setting site_id
end
|
#check_site_sxl_types ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/rsmp/supervisor.rb', line 44
def check_site_sxl_types
sites = @supervisor_settings['sites'].clone || {}
sites['guest'] = @supervisor_settings['guest']
sites.each do |site_id,settings|
unless settings
raise RSMP::ConfigurationError.new("Configuration for site '#{site_id}' is empty")
end
sxl = settings['sxl']
unless sxl
raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': No SXL specified")
end
RSMP::Schemer.find_schemas! sxl if sxl
rescue RSMP::Schemer::UnknownSchemaError => e
raise RSMP::ConfigurationError.new("Configuration error for site '#{site_id}': #{e}")
end
end
|
#close(socket, info) ⇒ Object
199
200
201
202
203
204
205
206
207
|
# File 'lib/rsmp/supervisor.rb', line 199
def close socket, info
if info
log "Connection to #{format_ip_and_port(info)} closed", ip: info[:ip], level: :info, timestamp: Clock.now
else
log "Connection closed", level: :info, timestamp: Clock.now
end
socket.close
end
|
#connect(socket, info) ⇒ Object
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/rsmp/supervisor.rb', line 146
def connect socket, info
log "Site connected from #{format_ip_and_port(info)}",
ip: info[:ip],
port: info[:port],
level: :info,
timestamp: Clock.now
authorize_ip info[:ip]
stream = Async::IO::Stream.new socket
protocol = Async::IO::Protocol::Line.new stream, Proxy::WRAPPING_DELIMITER
settings = {
supervisor: self,
ip: info[:ip],
port: info[:port],
task: @task,
collect: @collect,
socket: socket,
stream: stream,
protocol: protocol,
info: info,
logger: @logger,
archive: @archive
}
id = peek_version_message protocol
proxy = find_site id
if proxy
if proxy.connected?
raise ConnectionError.new("Site #{id} alredy connected from port #{proxy.port}")
else
proxy.revive settings
end
else
check_max_sites
proxy = build_proxy settings.merge(site_id:id) @proxies.push proxy
end
proxy.run ensure
site_ids_changed
stop if @supervisor_settings['one_shot']
end
|
#find_site(site_id) ⇒ Object
220
221
222
223
224
225
|
# File 'lib/rsmp/supervisor.rb', line 220
def find_site site_id
@proxies.each do |site|
return site if site_id == :any || site.site_id == site_id
end
nil
end
|
#find_site_from_ip_port(ip, port) ⇒ Object
213
214
215
216
217
218
|
# File 'lib/rsmp/supervisor.rb', line 213
def find_site_from_ip_port ip, port
@proxies.each do |site|
return site if site.ip == ip && site.port == port
end
nil
end
|
116
117
118
119
120
121
122
|
# File 'lib/rsmp/supervisor.rb', line 116
def format_ip_and_port info
if @logger.settings['hide_ip_and_port']
'********'
else
"#{info[:ip]}:#{info[:port]}"
end
end
|
#handle_connection(socket) ⇒ Object
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/rsmp/supervisor.rb', line 81
def handle_connection socket
remote_port = socket.remote_address.ip_port
remote_hostname = socket.remote_address.ip_address
remote_ip = socket.remote_address.ip_address
info = {ip:remote_ip, port:remote_port, hostname:remote_hostname, now:Clock.now}
if accept? socket, info
connect socket, info
else
reject socket, info
end
rescue ConnectionError => e
log "Rejected connection from #{remote_ip}:#{remote_port}, #{e.to_s}", level: :warning
notify_error e
rescue StandardError => e
log "Connection: #{e.to_s}", exception: e, level: :error
notify_error e, level: :internal
ensure
close socket, info
end
|
#handle_supervisor_settings(supervisor_settings) ⇒ Object
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/rsmp/supervisor.rb', line 20
def handle_supervisor_settings supervisor_settings
defaults = {
'port' => 12111,
'ips' => 'all',
'guest' => {
'rsmp_versions' => 'all',
'sxl' => 'tlc',
'intervals' => {
'timer' => 1,
'watchdog' => 1
},
'timeouts' => {
'watchdog' => 2,
'acknowledgement' => 2
}
}
}
@supervisor_settings = defaults.deep_merge(supervisor_settings)
@rsmp_versions = @supervisor_settings["guest"]["rsmp_versions"]
check_site_sxl_types
end
|
#ip_to_site_settings(ip) ⇒ Object
266
267
268
|
# File 'lib/rsmp/supervisor.rb', line 266
def ip_to_site_settings ip
@supervisor_settings['sites'][ip] || @supervisor_settings['sites']['guest']
end
|
#peek_version_message(protocol) ⇒ Object
139
140
141
142
143
144
|
# File 'lib/rsmp/supervisor.rb', line 139
def peek_version_message protocol
json = protocol.peek_line
attributes = Message.parse_attributes json
message = Message.build attributes, json
message.attribute('siteId').first['sId']
end
|
#reject(socket, info) ⇒ Object
195
196
197
|
# File 'lib/rsmp/supervisor.rb', line 195
def reject socket, info
log "Site rejected", ip: info[:ip], level: :info
end
|
#site_connected?(site_id) ⇒ Boolean
209
210
211
|
# File 'lib/rsmp/supervisor.rb', line 209
def site_connected? site_id
return find_site(site_id) != nil
end
|
#site_id_to_site_setting(site_id) ⇒ Object
256
257
258
259
260
261
262
263
264
|
# File 'lib/rsmp/supervisor.rb', line 256
def site_id_to_site_setting site_id
return {} unless @supervisor_settings['sites']
@supervisor_settings['sites'].each_pair do |id,settings|
if id == 'guest' || id == site_id
return settings
end
end
raise HandshakeError.new "site id #{site_id} unknown"
end
|
#site_ids_changed ⇒ Object
191
192
193
|
# File 'lib/rsmp/supervisor.rb', line 191
def site_ids_changed
@site_id_condition.signal
end
|
#start_action ⇒ Object
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/rsmp/supervisor.rb', line 61
def start_action
@endpoint = Async::IO::Endpoint.tcp('0.0.0.0', @supervisor_settings["port"])
@endpoint.accept do |socket| handle_connection(socket)
rescue StandardError => e
notify_error e, level: :internal
end
rescue StandardError => e
notify_error e, level: :internal
end
|
#starting ⇒ Object
102
103
104
105
106
|
# File 'lib/rsmp/supervisor.rb', line 102
def starting
log "Starting supervisor on port #{@supervisor_settings["port"]}",
level: :info,
timestamp: @clock.now
end
|
#stop ⇒ Object
72
73
74
75
76
77
78
79
|
# File 'lib/rsmp/supervisor.rb', line 72
def stop
log "Stopping supervisor #{@supervisor_settings["site_id"]}", level: :info
@proxies.each { |proxy| proxy.stop }
@proxies.clear
super
@tcp_server.close if @tcp_server
@tcp_server = nil
end
|
#wait_for_site(site_id, timeout) ⇒ Object
227
228
229
230
231
232
233
234
235
236
237
238
|
# File 'lib/rsmp/supervisor.rb', line 227
def wait_for_site site_id, timeout
site = find_site site_id
return site if site
wait_for(@site_id_condition,timeout) { find_site site_id }
rescue Async::TimeoutError
if site_id == :any
str = "No site connected"
else
str = "Site '#{site_id}' did not connect"
end
raise RSMP::TimeoutError.new "#{str} within #{timeout}s"
end
|
#wait_for_site_disconnect(site_id, timeout) ⇒ Object
240
241
242
243
244
|
# File 'lib/rsmp/supervisor.rb', line 240
def wait_for_site_disconnect site_id, timeout
wait_for(@site_id_condition,timeout) { true unless find_site site_id }
rescue Async::TimeoutError
raise RSMP::TimeoutError.new "Site '#{site_id}' did not disconnect within #{timeout}s"
end
|