Class: RSMP::SupervisorProxy
- Defined in:
- lib/rsmp/proxy/supervisor/supervisor_proxy.rb,
lib/rsmp/proxy/supervisor/modules/alarms.rb,
lib/rsmp/proxy/supervisor/modules/status.rb,
lib/rsmp/proxy/supervisor/modules/commands.rb,
lib/rsmp/proxy/supervisor/modules/aggregated_status.rb
Overview
Proxy used by sites to connect to a remote supervisor.
Defined Under Namespace
Modules: Modules
Constant Summary
Constants inherited from Proxy
Instance Attribute Summary collapse
-
#site ⇒ Object
readonly
Returns the value of attribute site.
-
#supervisor_id ⇒ Object
readonly
Returns the value of attribute supervisor_id.
Attributes inherited from Proxy
#accepted_sxls, #archive, #collector, #connection_info, #core_version, #ip, #node, #port, #rejected_sxls, #site_settings, #state, #sxl_interfaces, #sxls
Attributes included from Task
Attributes included from Distributor
Attributes included from Logging
Instance Method Summary collapse
- #acknowledged_first_ingoing(message) ⇒ Object
- #check_sxl_version(message) ⇒ Object
- #component_list_acknowledged ⇒ Object
-
#connect ⇒ Object
connect to the supervisor and initiate handshake supervisor.
- #connect_tcp ⇒ Object
- #handle_interface_request(message) ⇒ Object
- #handshake_complete ⇒ Object
-
#initialize(options) ⇒ SupervisorProxy
constructor
A new instance of SupervisorProxy.
- #main ⇒ Object
- #process_message(message) ⇒ Object
- #process_sxl_request(message) ⇒ Object
- #process_version(message) ⇒ Object
- #reconnect_delay? ⇒ Boolean
-
#run ⇒ Object
handle communication if disconnected, then try to reconnect.
- #send_component_list ⇒ Object
- #start_handshake ⇒ Object
- #stop_task ⇒ Object
- #timer(now) ⇒ Object
- #version_accepted(message) ⇒ Object
Methods included from Modules::AggregatedStatus
#process_aggregated_status_request, #send_aggregated_status, #send_all_aggregated_status
Methods included from Modules::Alarms
#handle_alarm_acknowledge, #handle_alarm_request, #handle_alarm_resume, #handle_alarm_suspend, #process_alarm, #send_active_alarms
Methods included from Modules::Commands
#build_command_rvs, #execute_commands, #process_command_request, #simplify_command_requests
Methods included from Modules::Status
#add_status_subscription, #build_status_list, #build_undefined_statuses, #check_on_change_update, #check_status_subscription, #fetch_last_sent_status, #fetch_status_values, #get_status_subscribe_interval, #interval_update_due?, #process_status_request, #process_status_subcribe, #process_status_unsubcribe, #remove_status_subscription, #rsmpify_value, #send_status_updates, #status_update_timer, #store_last_sent_status
Methods inherited from Proxy
#author, #build_sxl_interfaces, #clear, #clock, #close, #close_socket, #close_stream, #connected?, #disconnect, #disconnected?, #inspect, #log, #now, #primary_sxl, #ready?, #receive_alarms?, #receive_error, #revive, #schemas, #setup, #state_changed, #stop_reader, #stop_subtasks, #stop_timer, #sxl, #sxl_interface, #sxl_interface_for, #sxl_version, #tlc, version_meets_requirement?, #wait_for_reader
Methods included from Proxy::Modules::Tasks
#read_line, #run_reader, #run_timer, #start_reader, #start_timer
Methods included from Proxy::Modules::Versions
#check_core_version, #configured_sxls, #core_3_3?, #core_versions, #extraneous_version, #normalized_core_versions, #primary_configured_sxl, #send_version, #send_version_message, #send_version_request, #send_version_response, #site_id_items, #sxl_request_items, #version_acknowledged, #version_items, version_meets_requirement?, #version_meets_requirement?, #version_message_attributes, #version_request_attributes, #version_response_sxls
Methods included from Proxy::Modules::Receive
#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle
Methods included from Proxy::Modules::Send
#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_message, #send_message_and_collect
Methods included from Proxy::Modules::Acknowledgements
#acknowledge, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged
Methods included from Proxy::Modules::Watchdogs
#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled
Methods included from Proxy::Modules::State
Methods included from Task
#initialize_task, #restart, #start, #stop, #stop_subtasks, #task_status, #wait, #wait_for_condition
Methods included from Distributor
#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #inspect, #remove_receiver, #with_deferred_distribution
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options) ⇒ SupervisorProxy
Returns a new instance of SupervisorProxy.
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 13 def initialize() super(.merge(node: [:site])) @site = [:site] @site_settings = @site.site_settings.clone @ip = [:ip] @port = [:port] @status_subscriptions = {} @sxls = configured_sxls @accepted_sxls = @sxls.dup @rejected_sxls = [] @synthetic_id = Supervisor.build_id_from_ip_port @ip, @port end |
Instance Attribute Details
#site ⇒ Object (readonly)
Returns the value of attribute site.
11 12 13 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 11 def site @site end |
#supervisor_id ⇒ Object (readonly)
Returns the value of attribute supervisor_id.
11 12 13 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 11 def supervisor_id @supervisor_id end |
Instance Method Details
#acknowledged_first_ingoing(message) ⇒ Object
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 144 def acknowledged_first_ingoing() case .type when 'Watchdog' if core_3_3? send_component_list else handshake_complete end end end |
#check_sxl_version(message) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 187 def check_sxl_version() if core_3_3? @rejected_sxls, @accepted_sxls = .sxls.partition { |item| item['rejected'] } @receive_alarms = .attributes.fetch('receiveAlarms', true) else primary = primary_configured_sxl raise HandshakeError, 'Legacy Version response received, but no SXL is configured' unless primary @accepted_sxls = [primary] @rejected_sxls = [] end build_sxl_interfaces end |
#component_list_acknowledged ⇒ Object
205 206 207 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 205 def component_list_acknowledged handshake_complete end |
#connect ⇒ Object
connect to the supervisor and initiate handshake supervisor
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 55 def connect log "Connecting to supervisor at #{@ip}:#{@port}", level: :info self.state = :connecting connect_tcp @logger.unmute @ip, @port log "Connected to supervisor at #{@ip}:#{@port}", level: :info rescue SystemCallError => e raise ConnectionError, "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}" rescue StandardError => e raise ConnectionError, "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}" end |
#connect_tcp ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 72 def connect_tcp @endpoint = IO::Endpoint.tcp(@ip, @port) # this timeout is a workaround for connect hanging on windows if the other side is not present yet timeout = @site_settings.dig('timeouts', 'connect') || 1.1 task.with_timeout timeout do @socket = @endpoint.connect end delay = @site_settings.dig('intervals', 'after_connect') task.sleep delay if delay @stream = IO::Stream::Buffered.new(@socket) @protocol = RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed self.state = :connected rescue Errno::ECONNREFUSED => e # rescue to avoid log output log 'Connection refused', level: :warning raise e end |
#handle_interface_request(message) ⇒ Object
123 124 125 126 127 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 123 def handle_interface_request() interface = sxl_interface_for interface. interface. end |
#handshake_complete ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 91 def handshake_complete sxl_summary = accepted_sxls.map { |item| "#{item['name']} #{item['version']}" }.join(', ') log "Connection to supervisor established, using core #{@core_version}, SXLs [#{sxl_summary}]", level: :info self.state = :ready start_watchdog if @site_settings['send_after_connect'] send_all_aggregated_status send_active_alarms if receive_alarms? end super end |
#main ⇒ Object
209 210 211 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 209 def main @site.main end |
#process_message(message) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 104 def () case when StatusResponse, StatusUpdate, AggregatedStatus, AlarmIssue will_not_handle when AggregatedStatusRequest process_aggregated_status_request when CommandResponse process_command_response when CommandRequest, StatusRequest, StatusSubscribe, StatusUnsubscribe, Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest handle_interface_request else super end rescue UnknownComponent, UnknownCommand, UnknownStatus, MessageRejected, MissingAttribute => e dont_acknowledge , '', e.to_s end |
#process_sxl_request(message) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 129 def process_sxl_request() case when CommandRequest process_command_request when StatusRequest process_status_request when StatusSubscribe process_status_subcribe when StatusUnsubscribe process_status_unsubcribe when Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest process_alarm end end |
#process_version(message) ⇒ Object
178 179 180 181 182 183 184 185 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 178 def process_version() return extraneous_version if @version_determined check_core_version check_sxl_version @site_id = Supervisor.build_id_from_ip_port @ip, @port version_accepted end |
#reconnect_delay? ⇒ Boolean
155 156 157 158 159 160 161 162 163 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 155 def reconnect_delay? return false if @site_settings['intervals']['reconnect'] == :no interval = @site_settings['intervals']['reconnect'] log "Will try to reconnect again every #{interval} seconds...", level: :info @logger.mute @ip, @port @task.sleep interval true end |
#run ⇒ Object
handle communication if disconnected, then try to reconnect
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 28 def run loop do connect start_reader start_handshake wait_for_reader # run until disconnected break unless reconnect_delay? rescue Restart @logger.mute @ip, @port raise rescue RSMP::ConnectionError => e log e, level: :error break unless reconnect_delay? rescue StandardError => e distribute_error e, level: :internal break unless reconnect_delay? ensure close stop_subtasks end end |
#send_component_list ⇒ Object
201 202 203 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 201 def send_component_list ComponentList.new('components' => @site.component_list) end |
#start_handshake ⇒ Object
50 51 52 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 50 def start_handshake send_version_request @site_settings['site_id'], core_versions end |
#stop_task ⇒ Object
67 68 69 70 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 67 def stop_task super @last_status_sent = nil end |
#timer(now) ⇒ Object
173 174 175 176 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 173 def timer(now) super status_update_timer now if ready? end |
#version_accepted(message) ⇒ Object
165 166 167 168 169 170 171 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 165 def version_accepted() log "Received Version message, using RSMP #{@core_version}", message: , level: :log start_timer acknowledge @version_determined = true send_watchdog end |