Class: RSMP::SupervisorProxy
- Includes:
- Modules::AggregatedStatus, Modules::Alarms, Modules::Commands, Modules::MessageBuffer, Modules::Status
- Defined in:
- lib/rsmp/proxy/supervisor/supervisor_proxy.rb,
lib/rsmp/proxy/supervisor/modules/alarms.rb,
lib/rsmp/proxy/supervisor/modules/status.rb,
lib/rsmp/proxy/supervisor/modules/commands.rb,
lib/rsmp/proxy/supervisor/modules/message_buffer.rb,
lib/rsmp/proxy/supervisor/modules/status_updates.rb,
lib/rsmp/proxy/supervisor/modules/aggregated_status.rb
Overview
Proxy used by sites to connect to a remote supervisor.
Defined Under Namespace
Modules: Modules
Constant Summary
Constants inherited from Proxy
Instance Attribute Summary collapse
-
#message_buffer ⇒ Object
readonly
Returns the value of attribute message_buffer.
-
#site ⇒ Object
readonly
Returns the value of attribute site.
-
#supervisor_id ⇒ Object
readonly
Returns the value of attribute supervisor_id.
Attributes inherited from Proxy
#accepted_sxls, #archive, #collector, #connection_info, #core_version, #ip, #node, #port, #rejected_sxls, #site_settings, #state, #sxl_interfaces, #sxls
Attributes included from Task
Attributes included from Distributor
Attributes included from Logging
Instance Method Summary collapse
- #acknowledged_first_ingoing(message) ⇒ Object
- #check_sxl_version(message) ⇒ Object
- #close ⇒ Object
- #component_list_acknowledged ⇒ Object
-
#connect ⇒ Object
connect to the supervisor and initiate handshake supervisor.
- #connect_tcp ⇒ Object
- #handle_interface_request(message) ⇒ Object
- #handshake_complete ⇒ Object
-
#initialize(options) ⇒ SupervisorProxy
constructor
A new instance of SupervisorProxy.
- #main ⇒ Object
- #process_message(message) ⇒ Object
- #process_sxl_request(message) ⇒ Object
- #process_version(message) ⇒ Object
- #reconnect_delay? ⇒ Boolean
-
#run ⇒ Object
handle communication if disconnected, then try to reconnect.
- #run_accepted_connection ⇒ Object
- #send_component_list ⇒ Object
- #start_handshake ⇒ Object
- #stop_task ⇒ Object
- #version_accepted(message) ⇒ Object
Methods included from Modules::MessageBuffer
#buffer_message, #clone_message, #enqueue_buffered_message, #flush_buffered_message, #flush_message_buffer, #message_buffer_max_messages, #message_buffer_settings, #normalize_aggregated_status_buffer, #prepare_aggregated_status_for_buffer, #prepare_message_for_buffer, #prepare_status_update_for_buffer, #site_originated_buffer_candidate?, #status_buffer_selector?, #status_buffer_selector_matches?, #status_buffer_selectors
Methods included from Modules::AggregatedStatus
#process_aggregated_status_request, #send_aggregated_status, #send_all_aggregated_status
Methods included from Modules::Alarms
#handle_alarm_acknowledge, #handle_alarm_request, #handle_alarm_resume, #handle_alarm_suspend, #process_alarm, #send_active_alarms
Methods included from Modules::Commands
#build_command_rvs, #check_required_command_arguments, #command_catalogue_item, #command_catalogue_match, #execute_commands, #lookup_catalogue_command, #mark_command_unknown, #mark_commands_undefined, #process_command_request, #required_command_argument_names, #simplify_command_requests
Methods included from Modules::Status
#add_status_subscription, #build_undefined_statuses, #fetch_status_value, #fetch_status_values, #get_status_subscribe_interval, #process_status_request, #process_status_subcribe, #process_status_unsubcribe, #prune_unbuffered_status_subscriptions, #remove_status_subscription, #rsmpify_value
Methods included from Modules::StatusUpdates
#add_status_update, #build_status_item, #build_status_list, #check_on_change_update, #check_status_subscription, #collect_component_status_updates, #current_status_value, #each_status_name, #encode_status_value, #fetch_last_sent_status, #interval_update_due?, #precomputed_status_value, #send_component_status_update, #send_status_updates, #status_item_value, #status_update_timer, #status_updates_due, #store_last_sent_status, #store_last_sent_status_item
Methods inherited from Proxy
#author, #build_sxl_interfaces, #clear, #clock, #close_socket, #close_stream, #command_codes, #command_items, #connected?, #disconnect, #disconnected?, #inspect, #log, #multiple_command_codes?, #now, #primary_sxl, #ready?, #receive_alarms?, #receive_error, #reject_multiple_command_codes, #revive, #schemas, #setup, #state_changed, #stop_reader, #stop_subtasks, #stop_timer, #sxl, #sxl_interface, #sxl_interface_for, #sxl_version, #tlc, version_meets_requirement?, #wait_for_reader
Methods included from Proxy::Modules::Tasks
#read_line, #run_reader, #run_timer, #start_reader, #start_timer, #timer
Methods included from Proxy::Modules::Versions
#check_core_version, #configured_sxls, #core_3_3?, #core_versions, #extraneous_version, #normalized_core_versions, #primary_configured_sxl, #send_version, #send_version_message, #send_version_request, #send_version_response, #site_id_items, #sxl_request_items, #version_acknowledged, #version_items, version_meets_requirement?, #version_meets_requirement?, #version_message_attributes, #version_request_attributes, #version_response_sxls
Methods included from Proxy::Modules::Receive
#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle
Methods included from Proxy::Modules::Send
#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_message, #send_message_and_collect, #with_send_ready, #write_message
Methods included from Proxy::Modules::Acknowledgements
#acknowledge, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged
Methods included from Proxy::Modules::Watchdogs
#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled
Methods included from Proxy::Modules::State
Methods included from Task
#initialize_task, #restart, #start, #stop, #stop_subtasks, #task_status, #wait, #wait_for_condition
Methods included from Distributor
#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #inspect, #remove_receiver, #with_deferred_distribution
Methods included from Logging
#author, #initialize_logging, #log
Constructor Details
#initialize(options) ⇒ SupervisorProxy
Returns a new instance of SupervisorProxy.
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 14 def initialize() super(.merge(node: [:site])) @site = [:site] @site_settings = @site.site_settings.clone @ip = [:ip] @port = [:port] @status_subscriptions = {} @sxls = configured_sxls @accepted_sxls = @sxls.dup @rejected_sxls = [] @synthetic_id = Supervisor.build_id_from_ip_port @ip, @port @message_buffer = [] end |
Instance Attribute Details
#message_buffer ⇒ Object (readonly)
Returns the value of attribute message_buffer.
12 13 14 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12 def @message_buffer end |
#site ⇒ Object (readonly)
Returns the value of attribute site.
12 13 14 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12 def site @site end |
#supervisor_id ⇒ Object (readonly)
Returns the value of attribute supervisor_id.
12 13 14 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12 def supervisor_id @supervisor_id end |
Instance Method Details
#acknowledged_first_ingoing(message) ⇒ Object
171 172 173 174 175 176 177 178 179 180 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 171 def acknowledged_first_ingoing() case .type when 'Watchdog' if core_3_3? send_component_list else handshake_complete end end end |
#check_sxl_version(message) ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 209 def check_sxl_version() if core_3_3? @rejected_sxls, @accepted_sxls = .sxls.partition { |item| item['rejected'] } @receive_alarms = .attributes.fetch('receiveAlarms', true) else primary = primary_configured_sxl raise HandshakeError, 'Legacy Version response received, but no SXL is configured' unless primary @accepted_sxls = [primary] @rejected_sxls = [] end build_sxl_interfaces end |
#close ⇒ Object
75 76 77 78 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 75 def close prune_unbuffered_status_subscriptions super end |
#component_list_acknowledged ⇒ Object
227 228 229 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 227 def component_list_acknowledged handshake_complete end |
#connect ⇒ Object
connect to the supervisor and initiate handshake supervisor
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 81 def connect log "Connecting to supervisor at #{@ip}:#{@port}", level: :info self.state = :connecting connect_tcp @logger.unmute @ip, @port log "Connected to supervisor at #{@ip}:#{@port}", level: :info rescue SystemCallError => e raise ConnectionError, "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}" rescue StandardError => e raise ConnectionError, "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}" end |
#connect_tcp ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 98 def connect_tcp @endpoint = IO::Endpoint.tcp(@ip, @port) # this timeout is a workaround for connect hanging on windows if the other side is not present yet timeout = @site_settings.dig('timeouts', 'connect') || 1.1 task.with_timeout timeout do @socket = @endpoint.connect end delay = @site_settings.dig('intervals', 'after_connect') task.sleep delay if delay @stream = IO::Stream::Buffered.new(@socket) @protocol = RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed self.state = :connected rescue Errno::ECONNREFUSED => e # rescue to avoid log output log 'Connection refused', level: :warning raise e end |
#handle_interface_request(message) ⇒ Object
150 151 152 153 154 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 150 def handle_interface_request() interface = sxl_interface_for interface. interface. end |
#handshake_complete ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 117 def handshake_complete sxl_summary = accepted_sxls.map { |item| "#{item['name']} #{item['version']}" }.join(', ') log "Connection to supervisor established, using core #{@core_version}, SXLs [#{sxl_summary}]", level: :info self.state = :ready start_watchdog if @site_settings['send_after_connect'] send_all_aggregated_status send_active_alarms if receive_alarms? end super end |
#main ⇒ Object
231 232 233 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 231 def main @site.main end |
#process_message(message) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 131 def () case when StatusResponse, StatusUpdate, AggregatedStatus, AlarmIssue will_not_handle when AggregatedStatusRequest process_aggregated_status_request when CommandResponse process_command_response when CommandRequest, StatusRequest, StatusSubscribe, StatusUnsubscribe, Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest handle_interface_request else super end rescue UnknownComponent, UnknownCommand, UnknownStatus, MessageRejected, MissingAttribute => e dont_acknowledge , '', e.to_s end |
#process_sxl_request(message) ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 156 def process_sxl_request() case when CommandRequest process_command_request when StatusRequest process_status_request when StatusSubscribe process_status_subcribe when StatusUnsubscribe process_status_unsubcribe when Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest process_alarm end end |
#process_version(message) ⇒ Object
200 201 202 203 204 205 206 207 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 200 def process_version() return extraneous_version if @version_determined check_core_version check_sxl_version @site_id = Supervisor.build_id_from_ip_port @ip, @port version_accepted end |
#reconnect_delay? ⇒ Boolean
182 183 184 185 186 187 188 189 190 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 182 def reconnect_delay? return false if @site_settings['intervals']['reconnect'] == :no interval = @site_settings['intervals']['reconnect'] log "Will try to reconnect again every #{interval} seconds...", level: :info @logger.mute @ip, @port @task.sleep interval true end |
#run ⇒ Object
handle communication if disconnected, then try to reconnect
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 30 def run if @protocol run_accepted_connection return end loop do connect start_reader start_handshake wait_for_reader # run until disconnected break unless reconnect_delay? rescue Restart @logger.mute @ip, @port raise rescue RSMP::ConnectionError => e log e, level: :error break unless reconnect_delay? rescue StandardError => e distribute_error e, level: :internal break unless reconnect_delay? ensure close stop_subtasks end end |
#run_accepted_connection ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 57 def run_accepted_connection self.state = :connected start_reader start_handshake wait_for_reader rescue RSMP::ConnectionError => e log e, level: :error rescue StandardError => e distribute_error e, level: :internal ensure close stop_subtasks end |
#send_component_list ⇒ Object
223 224 225 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 223 def send_component_list ComponentList.new('components' => @site.component_list) end |
#start_handshake ⇒ Object
71 72 73 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 71 def start_handshake send_version_request @site_settings['site_id'], core_versions end |
#stop_task ⇒ Object
93 94 95 96 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 93 def stop_task super @last_status_sent = nil end |
#version_accepted(message) ⇒ Object
192 193 194 195 196 197 198 |
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 192 def version_accepted() log "Received Version message, using RSMP #{@core_version}", message: , level: :log start_timer acknowledge @version_determined = true send_watchdog end |