Module: RSMP::SupervisorProxy::Modules::Status
Overview
Status request and subscription handling
Instance Method Summary
collapse
-
#add_status_subscription(component_id, subs, update_list, arg, now) ⇒ Object
-
#build_undefined_statuses(args) ⇒ Object
-
#fetch_status_value(component, arg) ⇒ Object
-
#fetch_status_values(component, args) ⇒ Object
-
#get_status_subscribe_interval(component_id, sci, name) ⇒ Object
-
#process_status_request(message, options = {}) ⇒ Object
-
#process_status_subcribe(message) ⇒ Object
-
#process_status_unsubcribe(message) ⇒ Object
-
#prune_unbuffered_status_subscriptions ⇒ Object
-
#remove_status_subscription(subs, arg) ⇒ Object
-
#rsmpify_value(value, quality) ⇒ Object
#add_status_update, #build_status_item, #build_status_list, #check_on_change_update, #check_status_subscription, #collect_component_status_updates, #current_status_value, #each_status_name, #encode_status_value, #fetch_last_sent_status, #interval_update_due?, #precomputed_status_value, #send_component_status_update, #send_status_updates, #status_item_value, #status_update_timer, #status_updates_due, #store_last_sent_status, #store_last_sent_status_item
Instance Method Details
#add_status_subscription(component_id, subs, update_list, arg, now) ⇒ Object
60
61
62
63
64
65
66
67
68
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 60
def add_status_subscription(component_id, subs, update_list, arg, now)
sci = arg['sCI']
name = arg['n']
subcription = { interval: arg['uRt'].to_i, last_sent_at: now }
subs[sci] ||= {}
subs[sci][name] = subcription
update_list[component_id][sci] ||= []
update_list[component_id][sci] << name
end
|
#build_undefined_statuses(args) ⇒ Object
30
31
32
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 30
def build_undefined_statuses(args)
args.map { |arg| arg.dup.merge('q' => 'undefined', 's' => nil) }
end
|
#fetch_status_value(component, arg) ⇒ Object
22
23
24
25
26
27
28
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 22
def fetch_status_value(component, arg)
value, quality = component.get_status arg['sCI'], arg['n'], { sxl_version: sxl_version }
{ 's' => rsmpify_value(value, quality), 'q' => quality.to_s }.merge arg
rescue UnknownStatus => e
log e.to_s, level: :warning
{ 's' => nil, 'q' => 'unknown' }.merge arg
end
|
#fetch_status_values(component, args) ⇒ Object
16
17
18
19
20
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 16
def fetch_status_values(component, args)
args.map do |arg|
fetch_status_value component, arg
end
end
|
#get_status_subscribe_interval(component_id, sci, name) ⇒ Object
87
88
89
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 87
def get_status_subscribe_interval(component_id, sci, name)
@status_subscriptions.dig component_id, sci, name
end
|
#process_status_request(message, options = {}) ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 34
def process_status_request(message, options = {})
component_id = message.attributes['cId']
args = message.attributes['sS']
begin
component = @site.find_component component_id
ss = fetch_status_values(component, args)
log "Received #{message.type}", message: message, level: :log
rescue UnknownComponent
log "Received #{message.type} with unknown component id '#{component_id}' and cannot infer type",
message: message, level: :warning
ss = build_undefined_statuses(args)
end
response = StatusResponse.new({
'cId' => component_id,
'sTs' => clock.to_s,
'sS' => ss,
'mId' => options[:m_id]
})
apply_nts_message_attributes response
acknowledge message
send_message response
end
|
#process_status_subcribe(message) ⇒ Object
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 70
def process_status_subcribe(message)
log "Received #{message.type}", message: message, level: :log
update_list = {}
component_id = message.attributes['cId']
@status_subscriptions[component_id] ||= {}
update_list[component_id] ||= {}
now = Time.now
subs = @status_subscriptions[component_id]
message.attributes['sS'].each do |arg|
add_status_subscription(component_id, subs, update_list, arg, now)
end
acknowledge message
send_status_updates update_list
end
|
#process_status_unsubcribe(message) ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 99
def process_status_unsubcribe(message)
log "Received #{message.type}", message: message, level: :log
component = message.attributes['cId']
subs = @status_subscriptions[component]
if subs
message.attributes['sS'].each { |arg| remove_status_subscription(subs, arg) }
@status_subscriptions.delete(component) if subs.empty?
end
acknowledge message
end
|
#prune_unbuffered_status_subscriptions ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 111
def prune_unbuffered_status_subscriptions
@status_subscriptions.each_key.to_a.each do |component_id|
by_code = @status_subscriptions[component_id]
by_code.each_key.to_a.each do |code|
by_name = by_code[code]
by_name.delete_if do |name, _subscription|
status = { 'sCI' => code, 'n' => name }
!status_buffer_selector?(component_id, status)
end
by_code.delete(code) if by_name.empty?
end
@status_subscriptions.delete(component_id) if by_code.empty?
end
end
|
#remove_status_subscription(subs, arg) ⇒ Object
91
92
93
94
95
96
97
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 91
def remove_status_subscription(subs, arg)
sci = arg['sCI']
return unless subs[sci]
subs[sci].delete arg['n']
subs.delete(sci) if subs[sci].empty?
end
|
#rsmpify_value(value, quality) ⇒ Object
8
9
10
11
12
13
14
|
# File 'lib/rsmp/proxy/supervisor/modules/status.rb', line 8
def rsmpify_value(value, quality)
if %w[undefined unknown].include?(quality.to_s)
nil
else
value
end
end
|