Class: RSMP::SupervisorProxy

Inherits:
Proxy
  • Object
show all
Includes:
Modules::AggregatedStatus, Modules::Alarms, Modules::Commands, Modules::MessageBuffer, Modules::Status
Defined in:
lib/rsmp/proxy/supervisor/supervisor_proxy.rb,
lib/rsmp/proxy/supervisor/modules/alarms.rb,
lib/rsmp/proxy/supervisor/modules/status.rb,
lib/rsmp/proxy/supervisor/modules/commands.rb,
lib/rsmp/proxy/supervisor/modules/message_buffer.rb,
lib/rsmp/proxy/supervisor/modules/status_updates.rb,
lib/rsmp/proxy/supervisor/modules/aggregated_status.rb

Overview

Proxy used by sites to connect to a remote supervisor.

Defined Under Namespace

Modules: Modules

Constant Summary

Constants inherited from Proxy

Proxy::WRAPPING_DELIMITER

Instance Attribute Summary collapse

Attributes inherited from Proxy

#accepted_sxls, #archive, #collector, #connection_info, #core_version, #ip, #node, #port, #rejected_sxls, #site_settings, #state, #sxl_interfaces, #sxls

Attributes included from Task

#task

Attributes included from Distributor

#receivers

Attributes included from Logging

#archive, #logger

Instance Method Summary collapse

Methods included from Modules::MessageBuffer

#buffer_message, #clone_message, #enqueue_buffered_message, #flush_buffered_message, #flush_message_buffer, #message_buffer_max_messages, #message_buffer_settings, #normalize_aggregated_status_buffer, #prepare_aggregated_status_for_buffer, #prepare_message_for_buffer, #prepare_status_update_for_buffer, #site_originated_buffer_candidate?, #status_buffer_selector?, #status_buffer_selector_matches?, #status_buffer_selectors

Methods included from Modules::AggregatedStatus

#process_aggregated_status_request, #send_aggregated_status, #send_all_aggregated_status

Methods included from Modules::Alarms

#handle_alarm_acknowledge, #handle_alarm_request, #handle_alarm_resume, #handle_alarm_suspend, #process_alarm, #send_active_alarms

Methods included from Modules::Commands

#build_command_rvs, #check_required_command_arguments, #command_catalogue_item, #command_catalogue_match, #execute_commands, #lookup_catalogue_command, #mark_command_unknown, #mark_commands_undefined, #process_command_request, #required_command_argument_names, #simplify_command_requests

Methods included from Modules::Status

#add_status_subscription, #build_undefined_statuses, #fetch_status_value, #fetch_status_values, #get_status_subscribe_interval, #process_status_request, #process_status_subcribe, #process_status_unsubcribe, #prune_unbuffered_status_subscriptions, #remove_status_subscription, #rsmpify_value

Methods included from Modules::StatusUpdates

#add_status_update, #build_status_item, #build_status_list, #check_on_change_update, #check_status_subscription, #collect_component_status_updates, #current_status_value, #each_status_name, #encode_status_value, #fetch_last_sent_status, #interval_update_due?, #precomputed_status_value, #send_component_status_update, #send_status_updates, #status_item_value, #status_update_timer, #status_updates_due, #store_last_sent_status, #store_last_sent_status_item

Methods inherited from Proxy

#author, #build_sxl_interfaces, #clear, #clock, #close_socket, #close_stream, #command_codes, #command_items, #connected?, #disconnect, #disconnected?, #inspect, #log, #multiple_command_codes?, #now, #primary_sxl, #ready?, #receive_alarms?, #receive_error, #reject_multiple_command_codes, #revive, #schemas, #setup, #state_changed, #stop_reader, #stop_subtasks, #stop_timer, #sxl, #sxl_interface, #sxl_interface_for, #sxl_version, #tlc, version_meets_requirement?, #wait_for_reader

Methods included from Proxy::Modules::Tasks

#read_line, #run_reader, #run_timer, #start_reader, #start_timer, #timer

Methods included from Proxy::Modules::Versions

#check_core_version, #configured_sxls, #core_3_3?, #core_versions, #extraneous_version, #normalized_core_versions, #primary_configured_sxl, #send_version, #send_version_message, #send_version_request, #send_version_response, #site_id_items, #sxl_request_items, #version_acknowledged, #version_items, version_meets_requirement?, #version_meets_requirement?, #version_message_attributes, #version_request_attributes, #version_response_sxls

Methods included from Proxy::Modules::Receive

#expect_version_message, #handle_fatal_error, #handle_invalid_message, #handle_invalid_packet, #handle_malformed_message, #handle_schema_error, #process_deferred, #process_packet, #should_validate_ingoing_message?, #verify_sequence, #will_not_handle

Methods included from Proxy::Modules::Send

#apply_nts_message_attributes, #buffer_message, #handle_send_schema_error, #log_send, #send_message, #send_message_and_collect, #with_send_ready, #write_message

Methods included from Proxy::Modules::Acknowledgements

#acknowledge, #acknowledged_first_outgoing, #check_ack_timeout, #check_ingoing_acknowledged, #check_outgoing_acknowledged, #dont_acknowledge, #dont_expect_acknowledgement, #expect_acknowledgement, #find_original_for_message, #log_acknowledgement_for_original, #log_acknowledgement_for_unknown, #process_ack, #process_not_ack, #status_subscribe_acknowledged

Methods included from Proxy::Modules::Watchdogs

#check_watchdog_timeout, #process_watchdog, #send_watchdog, #start_watchdog, #stop_watchdog, #watchdog_send_timer, #with_watchdog_disabled

Methods included from Proxy::Modules::State

#wait_for_state

Methods included from Task

#initialize_task, #restart, #start, #stop, #stop_subtasks, #task_status, #wait, #wait_for_condition

Methods included from Distributor

#add_receiver, #clear_deferred_distribution, #distribute, #distribute_error, #distribute_immediately, #distribute_queued, #initialize_distributor, #inspect, #remove_receiver, #with_deferred_distribution

Methods included from Logging

#author, #initialize_logging, #log

Constructor Details

#initialize(options) ⇒ SupervisorProxy

Returns a new instance of SupervisorProxy.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 14

def initialize(options)
  super(options.merge(node: options[:site]))
  @site = options[:site]
  @site_settings = @site.site_settings.clone
  @ip = options[:ip]
  @port = options[:port]
  @status_subscriptions = {}
  @sxls = configured_sxls
  @accepted_sxls = @sxls.dup
  @rejected_sxls = []
  @synthetic_id = Supervisor.build_id_from_ip_port @ip, @port
  @message_buffer = []
end

Instance Attribute Details

#message_bufferObject (readonly)

Returns the value of attribute message_buffer.



12
13
14
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12

def message_buffer
  @message_buffer
end

#siteObject (readonly)

Returns the value of attribute site.



12
13
14
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12

def site
  @site
end

#supervisor_idObject (readonly)

Returns the value of attribute supervisor_id.



12
13
14
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 12

def supervisor_id
  @supervisor_id
end

Instance Method Details

#acknowledged_first_ingoing(message) ⇒ Object



171
172
173
174
175
176
177
178
179
180
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 171

def acknowledged_first_ingoing(message)
  case message.type
  when 'Watchdog'
    if core_3_3?
      send_component_list
    else
      handshake_complete
    end
  end
end

#check_sxl_version(message) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 209

def check_sxl_version(message)
  if core_3_3?
    @rejected_sxls, @accepted_sxls = message.sxls.partition { |item| item['rejected'] }
    @receive_alarms = message.attributes.fetch('receiveAlarms', true)
  else
    primary = primary_configured_sxl
    raise HandshakeError, 'Legacy Version response received, but no SXL is configured' unless primary

    @accepted_sxls = [primary]
    @rejected_sxls = []
  end
  build_sxl_interfaces
end

#closeObject



75
76
77
78
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 75

def close
  prune_unbuffered_status_subscriptions
  super
end

#component_list_acknowledgedObject



227
228
229
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 227

def component_list_acknowledged
  handshake_complete
end

#connectObject

connect to the supervisor and initiate handshake supervisor



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 81

def connect
  log "Connecting to supervisor at #{@ip}:#{@port}", level: :info
  self.state = :connecting
  connect_tcp
  @logger.unmute @ip, @port
  log "Connected to supervisor at #{@ip}:#{@port}", level: :info
rescue SystemCallError => e
  raise ConnectionError, "Could not connect to supervisor at #{@ip}:#{@port}: Errno #{e.errno} #{e}"
rescue StandardError => e
  raise ConnectionError, "Error while connecting to supervisor at #{@ip}:#{@port}: #{e}"
end

#connect_tcpObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 98

def connect_tcp
  @endpoint = IO::Endpoint.tcp(@ip, @port)

  # this timeout is a workaround for connect hanging on windows if the other side is not present yet
  timeout = @site_settings.dig('timeouts', 'connect') || 1.1
  task.with_timeout timeout do
    @socket = @endpoint.connect
  end
  delay = @site_settings.dig('intervals', 'after_connect')
  task.sleep delay if delay

  @stream = IO::Stream::Buffered.new(@socket)
  @protocol = RSMP::Protocol.new(@stream) # rsmp messages are json terminated with a form-feed
  self.state = :connected
rescue Errno::ECONNREFUSED => e # rescue to avoid log output
  log 'Connection refused', level: :warning
  raise e
end

#handle_interface_request(message) ⇒ Object



150
151
152
153
154
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 150

def handle_interface_request(message)
  interface = sxl_interface_for message
  interface.validate_message! message
  interface.process_message message
end

#handshake_completeObject



117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 117

def handshake_complete
  sxl_summary = accepted_sxls.map { |item| "#{item['name']} #{item['version']}" }.join(', ')
  log "Connection to supervisor established, using core #{@core_version}, SXLs [#{sxl_summary}]",
      level: :info
  self.state = :ready
  start_watchdog
  if @site_settings['send_after_connect']
    send_all_aggregated_status
    send_active_alarms if receive_alarms?
  end
  flush_message_buffer
  super
end

#mainObject



231
232
233
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 231

def main
  @site.main
end

#process_message(message) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 131

def process_message(message)
  case message
  when StatusResponse, StatusUpdate, AggregatedStatus, AlarmIssue
    will_not_handle message
  when AggregatedStatusRequest
    process_aggregated_status_request message
  when CommandResponse
    process_command_response message
  when CommandRequest, StatusRequest, StatusSubscribe, StatusUnsubscribe,
       Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest
    handle_interface_request message
  else
    super
  end
rescue UnknownComponent, UnknownCommand, UnknownStatus,
       MessageRejected, MissingAttribute => e
  dont_acknowledge message, '', e.to_s
end

#process_sxl_request(message) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 156

def process_sxl_request(message)
  case message
  when CommandRequest
    process_command_request message
  when StatusRequest
    process_status_request message
  when StatusSubscribe
    process_status_subcribe message
  when StatusUnsubscribe
    process_status_unsubcribe message
  when Alarm, AlarmAcknowledged, AlarmSuspend, AlarmResume, AlarmRequest
    process_alarm message
  end
end

#process_version(message) ⇒ Object



200
201
202
203
204
205
206
207
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 200

def process_version(message)
  return extraneous_version message if @version_determined

  check_core_version message
  check_sxl_version message
  @site_id = Supervisor.build_id_from_ip_port @ip, @port
  version_accepted message
end

#reconnect_delay?Boolean

Returns:

  • (Boolean)


182
183
184
185
186
187
188
189
190
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 182

def reconnect_delay?
  return false if @site_settings['intervals']['reconnect'] == :no

  interval = @site_settings['intervals']['reconnect']
  log "Will try to reconnect again every #{interval} seconds...", level: :info
  @logger.mute @ip, @port
  @task.sleep interval
  true
end

#runObject

handle communication if disconnected, then try to reconnect



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 30

def run
  if @protocol
    run_accepted_connection
    return
  end

  loop do
    connect
    start_reader
    start_handshake
    wait_for_reader # run until disconnected
    break unless reconnect_delay?
  rescue Restart
    @logger.mute @ip, @port
    raise
  rescue RSMP::ConnectionError => e
    log e, level: :error
    break unless reconnect_delay?
  rescue StandardError => e
    distribute_error e, level: :internal
    break unless reconnect_delay?
  ensure
    close
    stop_subtasks
  end
end

#run_accepted_connectionObject



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 57

def run_accepted_connection
  self.state = :connected
  start_reader
  start_handshake
  wait_for_reader
rescue RSMP::ConnectionError => e
  log e, level: :error
rescue StandardError => e
  distribute_error e, level: :internal
ensure
  close
  stop_subtasks
end

#send_component_listObject



223
224
225
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 223

def send_component_list
  send_message ComponentList.new('components' => @site.component_list)
end

#start_handshakeObject



71
72
73
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 71

def start_handshake
  send_version_request @site_settings['site_id'], core_versions
end

#stop_taskObject



93
94
95
96
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 93

def stop_task
  super
  @last_status_sent = nil
end

#version_accepted(message) ⇒ Object



192
193
194
195
196
197
198
# File 'lib/rsmp/proxy/supervisor/supervisor_proxy.rb', line 192

def version_accepted(message)
  log "Received Version message, using RSMP #{@core_version}", message: message, level: :log
  start_timer
  acknowledge message
  @version_determined = true
  send_watchdog
end