Class: RSMP::SiteProxy
Constant Summary
Constants inherited
from Proxy
Proxy::WRAPPING_DELIMITER
Instance Attribute Summary collapse
Attributes included from Components
#components
Attributes inherited from Proxy
#archive, #collector, #connection_info, #ip, #port, #state, #sxl
Attributes included from Task
#task
Attributes included from Notifier
#listeners
Attributes included from Logging
#archive, #logger
Instance Method Summary
collapse
-
#acknowledged_first_ingoing(message) ⇒ Object
-
#acknowledged_first_outgoing(message) ⇒ Object
-
#aggregated_status_changed(component, options = {}) ⇒ Object
-
#check_site_ids(message) ⇒ Object
-
#check_sxl_version(message) ⇒ Object
-
#find_site_settings(site_id) ⇒ Object
-
#handshake_complete ⇒ Object
-
#initialize(options) ⇒ SiteProxy
constructor
A new instance of SiteProxy.
-
#inspect ⇒ Object
-
#node ⇒ Object
-
#notify_error(e, options = {}) ⇒ Object
-
#process_aggregated_status(message) ⇒ Object
-
#process_alarm(message) ⇒ Object
-
#process_command_response(message) ⇒ Object
-
#process_message(message) ⇒ Object
-
#process_status_response(message) ⇒ Object
-
#process_status_update(message) ⇒ Object
-
#process_version(message) ⇒ Object
-
#process_watchdog(message) ⇒ Object
-
#request_aggregated_status(component, options = {}) ⇒ Object
-
#request_status(component, status_list, options = {}) ⇒ Object
-
#revive(options) ⇒ Object
-
#run ⇒ Object
handle communication when we're created, the socket is already open.
-
#send_alarm_acknowledgement(component, alarm_code, options = {}) ⇒ Object
-
#send_command(component, command_list, options = {}) ⇒ Object
-
#set_watchdog_interval(interval) ⇒ Object
-
#setup_site_settings ⇒ Object
-
#site_ids_changed ⇒ Object
-
#subscribe_to_status(component_id, status_list, options = {}) ⇒ Object
-
#sxl_version ⇒ Object
-
#unsubscribe_to_status(component_id, status_list, options = {}) ⇒ Object
-
#validate_aggregated_status(message, se) ⇒ Object
-
#validate_ready(action) ⇒ Object
-
#version_accepted(message) ⇒ Object
-
#version_acknowledged ⇒ Object
Methods included from Components
#add_component, #build_component, #check_main_component, #find_component, #infer_component_type, #initialize_components, #setup_components
Methods inherited from Proxy
#acknowledge, #author, #buffer_message, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #check_rsmp_version, #check_watchdog_timeout, #clear, #clock, #close, #close_socket, #close_stream, #connected?, #disconnect, #disconnected?, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #expect_version_message, #extraneous_version, #find_original_for_message, #get_schemas, #log, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #log_send, #process_ack, #process_deferred, #process_not_ack, #process_packet, #read_line, #ready?, #rsmp_versions, #run_reader, #run_timer, #send_and_optionally_collect, #send_message, #send_version, #send_watchdog, #set_state, #setup, #should_validate_ingoing_message?, #start_reader, #start_timer, #start_watchdog, #state_changed, #stop_reader, #stop_subtasks, #stop_task, #stop_timer, #timer, #verify_sequence, #wait_for_reader, #wait_for_state, #watchdog_send_timer, #will_not_handle
Methods included from Task
#initialize_task, #restart, #start, #stop, #stop_subtasks, #stop_task, #task_status, #wait, #wait_for_condition
Methods included from Inspect
#inspector
Methods included from Notifier
#add_listener, #clear_deferred_notify, #deferred_notify, #dequeue_notify, #distribute_error, #initialize_distributor, #notify, #notify_without_defer, #remove_listener
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options) ⇒ SiteProxy
Returns a new instance of SiteProxy.
9
10
11
12
13
14
15
|
# File 'lib/rsmp/site_proxy.rb', line 9
def initialize options
super options.merge(node:options[:supervisor])
initialize_components
@supervisor = options[:supervisor]
@settings = @supervisor.supervisor_settings.clone
@site_id = options[:site_id]
end
|
Instance Attribute Details
#site_id ⇒ Object
Returns the value of attribute site_id.
7
8
9
|
# File 'lib/rsmp/site_proxy.rb', line 7
def site_id
@site_id
end
|
#supervisor ⇒ Object
Returns the value of attribute supervisor.
7
8
9
|
# File 'lib/rsmp/site_proxy.rb', line 7
def supervisor
@supervisor
end
|
Instance Method Details
#acknowledged_first_ingoing(message) ⇒ Object
95
96
97
98
99
100
|
# File 'lib/rsmp/site_proxy.rb', line 95
def acknowledged_first_ingoing message
case message.type
when "Watchdog"
send_watchdog
end
end
|
#acknowledged_first_outgoing(message) ⇒ Object
102
103
104
105
106
107
|
# File 'lib/rsmp/site_proxy.rb', line 102
def acknowledged_first_outgoing message
case message.type
when "Watchdog"
handshake_complete
end
end
|
#aggregated_status_changed(component, options = {}) ⇒ Object
155
156
157
|
# File 'lib/rsmp/site_proxy.rb', line 155
def aggregated_status_changed component, options={}
@supervisor.aggregated_status_changed self, component
end
|
#check_site_ids(message) ⇒ Object
322
323
324
325
326
327
328
329
|
# File 'lib/rsmp/site_proxy.rb', line 322
def check_site_ids message
site_id = message.attribute("siteId").map { |item| item["sId"] }.first
@supervisor.check_site_id site_id
@site_id = site_id
setup_site_settings
site_ids_changed
end
|
#check_sxl_version(message) ⇒ Object
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
|
# File 'lib/rsmp/site_proxy.rb', line 292
def check_sxl_version message
type = 'tlc'
version = message.attribute 'SXL'
RSMP::Schemer::find_schema! type, version, lenient: true
@site_sxl_version = message.attribute 'SXL'
rescue RSMP::Schemer::UnknownSchemaError => e
dont_acknowledge message, "Rejected #{message.type} message,", "#{e}"
end
|
#find_site_settings(site_id) ⇒ Object
331
332
333
334
335
336
337
338
339
340
341
342
343
344
|
# File 'lib/rsmp/site_proxy.rb', line 331
def find_site_settings site_id
if @settings['sites'] && @settings['sites'][@site_id]
log "Using site settings for site id #{@site_id}", level: :debug
return @settings['sites'][@site_id]
end
settings = @settings['guest']
if @settings['guest']
log "Using site settings for guest", level: :debug
return @settings['guest']
end
nil
end
|
#handshake_complete ⇒ Object
47
48
49
50
51
52
|
# File 'lib/rsmp/site_proxy.rb', line 47
def handshake_complete
super
sanitized_sxl_version = RSMP::Schemer.sanitize_version(@site_sxl_version)
log "Connection to site #{@site_id} established, using core #{@rsmp_version}, #{@sxl} #{sanitized_sxl_version}", level: :log
start_watchdog
end
|
#inspect ⇒ Object
37
38
39
40
41
|
# File 'lib/rsmp/site_proxy.rb', line 37
def inspect
"#<#{self.class.name}:#{self.object_id}, #{inspector(
:@acknowledgements,:@settings,:@site_settings,:@components
)}>"
end
|
#node ⇒ Object
43
44
45
|
# File 'lib/rsmp/site_proxy.rb', line 43
def node
supervisor
end
|
#notify_error(e, options = {}) ⇒ Object
356
357
358
359
|
# File 'lib/rsmp/site_proxy.rb', line 356
def notify_error e, options={}
@supervisor.notify_error e, options if @supervisor
distribute_error e, options
end
|
#process_aggregated_status(message) ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
# File 'lib/rsmp/site_proxy.rb', line 139
def process_aggregated_status message
se = message.attribute("se")
validate_aggregated_status(message,se) == false
c_id = message.attributes["cId"]
component = find_component c_id
unless component
reason = "component #{c_id} not found"
dont_acknowledge message, "Ignoring #{message.type}:", reason
return
end
component.set_aggregated_status_bools se
log "Received #{message.type} status for component #{c_id} [#{component.aggregated_status.join(', ')}]", message: message
acknowledge message
end
|
#process_alarm(message) ⇒ Object
159
160
161
162
163
164
165
166
167
|
# File 'lib/rsmp/site_proxy.rb', line 159
def process_alarm message
component = find_component message.attribute("cId")
status = ["ack","aS","sS"].map { |key| message.attribute(key) }.join(',')
component.handle_alarm message
alarm_code = message.attribute("aCId")
asp = message.attribute("aSp")
log "Received #{message.type}, #{alarm_code} #{asp} [#{status}]", message: message, level: :log
acknowledge message
end
|
#process_command_response(message) ⇒ Object
82
83
84
85
|
# File 'lib/rsmp/site_proxy.rb', line 82
def process_command_response message
log "Received #{message.type}", message: message, level: :log
acknowledge message
end
|
#process_message(message) ⇒ Object
#process_status_response(message) ⇒ Object
204
205
206
207
208
209
|
# File 'lib/rsmp/site_proxy.rb', line 204
def process_status_response message
component = find_component message.attribute("cId")
component.handle_status_response message
log "Received #{message.type}", message: message, level: :log
acknowledge message
end
|
#process_status_update(message) ⇒ Object
253
254
255
256
257
258
|
# File 'lib/rsmp/site_proxy.rb', line 253
def process_status_update message
component = find_component message.attribute("cId")
component.handle_status_update message
log "Received #{message.type}", message: message, level: :log
acknowledge message
end
|
#process_version(message) ⇒ Object
314
315
316
317
318
319
320
|
# File 'lib/rsmp/site_proxy.rb', line 314
def process_version message
return message if @version_determined
check_site_ids message
check_rsmp_version message
check_sxl_version message
version_accepted message
end
|
#process_watchdog(message) ⇒ Object
172
173
174
|
# File 'lib/rsmp/site_proxy.rb', line 172
def process_watchdog message
super
end
|
#request_aggregated_status(component, options = {}) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/rsmp/site_proxy.rb', line 113
def request_aggregated_status component, options={}
validate_ready 'request aggregated status'
m_id = options[:m_id] || RSMP::Message.make_m_id
message = RSMP::AggregatedStatusRequest.new({
"ntsOId" => '',
"xNId" => '',
"cId" => component,
"mId" => m_id
})
send_and_optionally_collect message, options do |collect_options|
AggregatedStatusCollector.new(
self,
collect_options.merge(task:@task,m_id: m_id, num:1)
)
end
end
|
#request_status(component, status_list, options = {}) ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
|
# File 'lib/rsmp/site_proxy.rb', line 180
def request_status component, status_list, options={}
validate_ready 'request status'
m_id = options[:m_id] || RSMP::Message.make_m_id
request_list = status_list.map { |item| item.slice('sCI','n') }
message = RSMP::StatusRequest.new({
"ntsOId" => '',
"xNId" => '',
"cId" => component,
"sS" => request_list,
"mId" => m_id
})
send_and_optionally_collect message, options do |collect_options|
StatusCollector.new(
self,
status_list,
collect_options.merge(task:@task,m_id: m_id)
)
end
end
|
#revive(options) ⇒ Object
31
32
33
34
35
|
# File 'lib/rsmp/site_proxy.rb', line 31
def revive options
super options
@supervisor = options[:supervisor]
@settings = @supervisor.supervisor_settings.clone
end
|
#run ⇒ Object
handle communication when we're created, the socket is already open
19
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/rsmp/site_proxy.rb', line 19
def run
set_state :connected
start_reader
wait_for_reader rescue RSMP::ConnectionError => e
log e, level: :error
rescue StandardError => e
notify_error e, level: :internal
ensure
close
end
|
#send_alarm_acknowledgement(component, alarm_code, options = {}) ⇒ Object
260
261
262
263
264
265
266
267
|
# File 'lib/rsmp/site_proxy.rb', line 260
def send_alarm_acknowledgement component, alarm_code, options={}
message = RSMP::AlarmAcknowledged.new({
"cId" => component,
"aCId" => alarm_code,
})
send_message message, validate: options[:validate]
message
end
|
#send_command(component, command_list, options = {}) ⇒ Object
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
|
# File 'lib/rsmp/site_proxy.rb', line 269
def send_command component, command_list, options={}
validate_ready 'send command'
m_id = options[:m_id] || RSMP::Message.make_m_id
message = RSMP::CommandRequest.new({
"ntsOId" => '',
"xNId" => '',
"cId" => component,
"arg" => command_list,
"mId" => m_id
})
send_and_optionally_collect message, options do |collect_options|
CommandResponseCollector.new(
self,
command_list,
collect_options.merge(task:@task,m_id: m_id)
)
end
end
|
#set_watchdog_interval(interval) ⇒ Object
288
289
290
|
# File 'lib/rsmp/site_proxy.rb', line 288
def set_watchdog_interval interval
@settings['intervals']['watchdog'] = interval
end
|
#setup_site_settings ⇒ Object
346
347
348
349
350
351
352
353
354
|
# File 'lib/rsmp/site_proxy.rb', line 346
def setup_site_settings
@site_settings = find_site_settings @site_id
if @site_settings
@sxl = @site_settings['sxl']
setup_components @site_settings['components']
else
dont_acknowledge message, 'Rejected', "No config found for site #{@site_id}"
end
end
|
#site_ids_changed ⇒ Object
176
177
178
|
# File 'lib/rsmp/site_proxy.rb', line 176
def site_ids_changed
@supervisor.site_ids_changed
end
|
#subscribe_to_status(component_id, status_list, options = {}) ⇒ Object
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
# File 'lib/rsmp/site_proxy.rb', line 211
def subscribe_to_status component_id, status_list, options={}
validate_ready 'subscribe to status'
m_id = options[:m_id] || RSMP::Message.make_m_id
subscribe_list = status_list.map { |item| item.slice('sCI','n','uRt') }
component = find_component component_id
component.handle_status_subscribe subscribe_list
message = RSMP::StatusSubscribe.new({
"ntsOId" => '',
"xNId" => '',
"cId" => component_id,
"sS" => subscribe_list,
'mId' => m_id
})
send_and_optionally_collect message, options do |collect_options|
StatusCollector.new(
self,
status_list,
collect_options.merge(task:@task,m_id: m_id)
)
end
end
|
#sxl_version ⇒ Object
308
309
310
311
312
|
# File 'lib/rsmp/site_proxy.rb', line 308
def sxl_version
@site_sxl_version
end
|
#unsubscribe_to_status(component_id, status_list, options = {}) ⇒ Object
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
# File 'lib/rsmp/site_proxy.rb', line 238
def unsubscribe_to_status component_id, status_list, options={}
component = find_component component_id
component.handle_status_subscribe status_list
validate_ready 'unsubscribe to status'
message = RSMP::StatusUnsubscribe.new({
"ntsOId" => '',
"xNId" => '',
"cId" => component_id,
"sS" => status_list
})
send_message message, validate: options[:validate]
message
end
|
#validate_aggregated_status(message, se) ⇒ Object
131
132
133
134
135
136
137
|
# File 'lib/rsmp/site_proxy.rb', line 131
def validate_aggregated_status message, se
unless se && se.is_a?(Array) && se.size == 8
reason = "invalid AggregatedStatus, 'se' must be an Array of size 8"
dont_acknowledge message, "Received", reaons
raise InvalidMessage
end
end
|
#validate_ready(action) ⇒ Object
109
110
111
|
# File 'lib/rsmp/site_proxy.rb', line 109
def validate_ready action
raise NotReady.new("Can't #{action} because connection is not ready. (Currently #{@state})") unless ready?
end
|
#version_accepted(message) ⇒ Object
87
88
89
90
91
92
93
|
# File 'lib/rsmp/site_proxy.rb', line 87
def version_accepted message
log "Received Version message for site #{@site_id}", message: message, level: :log
start_timer
acknowledge message
send_version @site_id, rsmp_versions
@version_determined = true
end
|
#version_acknowledged ⇒ Object
169
170
|
# File 'lib/rsmp/site_proxy.rb', line 169
def version_acknowledged
end
|