Class: RSMP::SupervisorProxy

Inherits:
Proxy
  • Object
show all
Includes:
Modules::AggregatedStatus, Modules::Alarms, Modules::Commands, Modules::Status
Defined in:
lib/rsmp/proxy/supervisor/supervisor_proxy.rb,
lib/rsmp/proxy/supervisor/modules/alarms.rb,
lib/rsmp/proxy/supervisor/modules/status.rb,
lib/rsmp/proxy/supervisor/modules/commands.rb,
lib/rsmp/proxy/supervisor/modules/aggregated_status.rb

Overview

Proxy used by sites to connect to a remote supervisor.

Defined Under Namespace

Modules: Modules

Constant Summary

Constants inherited from Proxy

Proxy::WRAPPING_DELIMITER

Instance Attribute Summary collapse

Attributes inherited from Proxy

#accepted_sxls, #archive, #collector, #connection_info, #core_version, #ip, #node, #port, #rejected_sxls, #site_settings, #state, #sxl_interfaces, #sxls

Attributes included from Task

#task

Attributes included from Distributor

#receivers

Attributes included from Logging

#archive, #logger

Instance Method Summary collapse

Methods included from Modules::AggregatedStatus

#process_aggregated_status_request, #send_aggregated_status, #send_all_aggregated_status

Methods included from Modules::Alarms

#handle_alarm_acknowledge, #handle_alarm_request, #handle_alarm_resume, #handle_alarm_suspend, #process_alarm, #send_active_alarms

Methods included from Modules::Commands

#build_command_rvs, #execute_commands, #process_command_request, #simplify_command_requests

Methods included from Modules::Status

#add_status_subscription, #build_status_list, #build_undefined_statuses, #check_on_change_update, #check_status_subscription, #fetch_last_sent_status, #fetch_status_values, #get_status_subscribe_interval, #interval_update_due?, #process_status_request, #process_status_subcribe, #process_status_unsubcribe, #remove_status_subscription, #rsmpify_value, #send_status_updates, #status_update_timer, #store_last_sent_status

Methods inherited from Proxy

#author, #build_sxl_interfaces, #clear, #clock, #close, #close_socket, #close_stream, #connected?, #disconnect, #disconnected?, #inspect, #log, #now, #primary_sxl, #ready?, #receive_alarms?, #receive_error, #revive, #schemas, #setup, #state_changed, #stop_reader, #stop_subtasks, #stop_timer, #sxl, #sxl_interface, #sxl_interface_for, #sxl_version, #tlc, version_meets_requirement?, #wait_for_reader

Methods included from Proxy::Modules::Tasks

#read_line, #run_reader, #run_timer, #start_reader, #start_timer

Methods included from Proxy::Modules::Versions

#check_core_version, #configured_sxls, #core_3_3?, #core_versions, #extraneous_version, #normalized_core_versions, #primary_configured_sxl, #send_version, #send_version_message, #send_version_request, #send_version_response, #site_id_items, #sxl_request_items, #version_acknowledged, #version_items, version_meets_requirement?, #version_meets_requirement?, #version_message_attributes, #version_request_attributes, #version_response_sxls

Methods included from Proxy::Modules::Receive

#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle

Methods included from Proxy::Modules::Send

#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_message, #send_message_and_collect

Methods included from Proxy::Modules::Acknowledgements

#acknowledge, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged

Methods included from Proxy::Modules::Watchdogs

#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled

Methods included from Proxy::Modules::State

#wait_for_state

Methods included from Task

#initialize_task, #restart, #start, #stop, #stop_subtasks, #task_status, #wait, #wait_for_condition

Methods included from Distributor

#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #inspect, #remove_receiver, #with_deferred_distribution

Methods included from Logging

#author, #initialize_logging, #log

Constructor Details

#initialize(options) ⇒ SupervisorProxy

Returns a new instance of SupervisorProxy.



13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 13

def initialize(options)
  super(options.merge(node: options[:site]))
  @site = options[:site]
  @site_settings = @site.site_settings.clone
  @ip = options[:ip]
  @port = options[:port]
  @status_subscriptions = {}
  @sxls = configured_sxls
  @accepted_sxls = @sxls.dup
  @rejected_sxls = []
  @synthetic_id = Supervisor.build_id_from_ip_port @ip, @port
end

Instance Attribute Details

#siteObject (readonly)

Returns the value of attribute site.



11
12
13
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 11

def site
  @site
end

#supervisor_idObject (readonly)

Returns the value of attribute supervisor_id.



11
12
13
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 11

def supervisor_id
  @supervisor_id
end

Instance Method Details

#acknowledged_first_ingoing(message) ⇒ Object



144
145
146
147
148
149
150
151
152
153
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 144

def acknowledged_first_ingoing(message)
  case message.type
  when 'Watchdog'
    if core_3_3?
      send_component_list
    else
      handshake_complete
    end
  end
end

#check_sxl_version(message) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 187

def check_sxl_version(message)
  if core_3_3?
    @rejected_sxls, @accepted_sxls = message.sxls.partition { |item| item['rejected'] }
    @receive_alarms = message.attributes.fetch('receiveAlarms', true)
  else
    primary = primary_configured_sxl
    raise HandshakeError, 'Legacy Version response received, but no SXL is configured' unless primary

    @accepted_sxls = [primary]
    @rejected_sxls = []
  end
  build_sxl_interfaces
end

#component_list_acknowledgedObject



205
206
207
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 205

def component_list_acknowledged
  handshake_complete
end

#connectObject

connect to the supervisor and initiate handshake supervisor



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 55

def connect
  log "Connecting to supervisor at #{@ip}:#{@port}", level: :info
  self.state = :connecting
  connect_tcp
  @logger.unmute @ip, @port
  log "Connected to supervisor at #{@ip}:#{@port}", level: :info
rescue SystemCallError => e
  raise ConnectionError, "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}"
rescue StandardError => e
  raise ConnectionError, "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}"
end

#connect_tcpObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 72

def connect_tcp
  @endpoint = IO::Endpoint.tcp(@ip, @port)

  # this timeout is a workaround for connect hanging on windows if the other side is not present yet
  timeout = @site_settings.dig('timeouts', 'connect') || 1.1
  task.with_timeout timeout do
    @socket = @endpoint.connect
  end
  delay = @site_settings.dig('intervals', 'after_connect')
  task.sleep delay if delay

  @stream = IO::Stream::Buffered.new(@socket)
  @protocol = RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed
  self.state = :connected
rescue Errno::ECONNREFUSED => e # rescue to avoid log output
  log 'Connection refused', level: :warning
  raise e
end

#handle_interface_request(message) ⇒ Object



123
124
125
126
127
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 123

def handle_interface_request(message)
  interface = sxl_interface_for message
  interface.validate_message! message
  interface.process_message message
end

#handshake_completeObject



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 91

def handshake_complete
  sxl_summary = accepted_sxls.map { |item| "#{item['name']} #{item['version']}" }.join(', ')
  log "Connection to supervisor established, using core #{@core_version}, SXLs [#{sxl_summary}]",
      level: :info
  self.state = :ready
  start_watchdog
  if @site_settings['send_after_connect']
    send_all_aggregated_status
    send_active_alarms if receive_alarms?
  end
  super
end

#mainObject



209
210
211
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 209

def main
  @site.main
end

#process_message(message) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 104

def process_message(message)
  case message
  when StatusResponse, StatusUpdate, AggregatedStatus, AlarmIssue
    will_not_handle message
  when AggregatedStatusRequest
    process_aggregated_status_request message
  when CommandResponse
    process_command_response message
  when CommandRequest, StatusRequest, StatusSubscribe, StatusUnsubscribe,
       Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest
    handle_interface_request message
  else
    super
  end
rescue UnknownComponent, UnknownCommand, UnknownStatus,
       MessageRejected, MissingAttribute => e
  dont_acknowledge message, '', e.to_s
end

#process_sxl_request(message) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 129

def process_sxl_request(message)
  case message
  when CommandRequest
    process_command_request message
  when StatusRequest
    process_status_request message
  when StatusSubscribe
    process_status_subcribe message
  when StatusUnsubscribe
    process_status_unsubcribe message
  when Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest
    process_alarm message
  end
end

#process_version(message) ⇒ Object



178
179
180
181
182
183
184
185
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 178

def process_version(message)
  return extraneous_version message if @version_determined

  check_core_version message
  check_sxl_version message
  @site_id = Supervisor.build_id_from_ip_port @ip, @port
  version_accepted message
end

#reconnect_delay?Boolean

Returns:

  • (Boolean)


155
156
157
158
159
160
161
162
163
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 155

def reconnect_delay?
  return false if @site_settings['intervals']['reconnect'] == :no

  interval = @site_settings['intervals']['reconnect']
  log "Will try to reconnect again every #{interval} seconds...", level: :info
  @logger.mute @ip, @port
  @task.sleep interval
  true
end

#runObject

handle communication if disconnected, then try to reconnect



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 28

def run
  loop do
    connect
    start_reader
    start_handshake
    wait_for_reader # run until disconnected
    break unless reconnect_delay?
  rescue Restart
    @logger.mute @ip, @port
    raise
  rescue RSMP::ConnectionError => e
    log e, level: :error
    break unless reconnect_delay?
  rescue StandardError => e
    distribute_error e, level: :internal
    break unless reconnect_delay?
  ensure
    close
    stop_subtasks
  end
end

#send_component_listObject



201
202
203
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 201

def send_component_list
  send_message ComponentList.new('components' => @site.component_list)
end

#start_handshakeObject



50
51
52
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 50

def start_handshake
  send_version_request @site_settings['site_id'], core_versions
end

#stop_taskObject



67
68
69
70
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 67

def stop_task
  super
  @last_status_sent = nil
end

#timer(now) ⇒ Object



173
174
175
176
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 173

def timer(now)
  super
  status_update_timer now if ready?
end

#version_accepted(message) ⇒ Object



165
166
167
168
169
170
171
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 165

def version_accepted(message)
  log "Received Version message, using RSMP #{@core_version}", message: message, level: :log
  start_timer
  acknowledge message
  @version_determined = true
  send_watchdog
end