Class: OpenC3::MqttInterface
- Defined in:
- lib/openc3/interfaces/mqtt_interface.rb
Overview
Base class for interfaces that send and receive messages over MQTT
Constant Summary
Constants included from Api
Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS
Constants included from ApiShared
ApiShared::DEFAULT_TLM_POLLING_RATE
Constants included from Extract
Extract::SCANNING_REGULAR_EXPRESSION
Instance Attribute Summary
Attributes inherited from Interface
#auto_reconnect, #bytes_read, #bytes_written, #cmd_target_enabled, #cmd_target_names, #config_params, #connect_on_startup, #disable_disconnect, #interfaces, #name, #num_clients, #options, #protocol_info, #read_count, #read_protocols, #read_queue_size, #read_raw_data, #read_raw_data_time, #reconnect_delay, #routers, #save_raw_data, #scheduler, #secrets, #state, #stream_log_pair, #target_names, #tlm_target_enabled, #tlm_target_names, #write_count, #write_protocols, #write_queue_size, #written_raw_data, #written_raw_data_time
Instance Method Summary collapse
-
#connect ⇒ Object
Connects the interface to its target(s).
-
#connected? ⇒ Boolean
Whether the MQTT client is connected.
- #connection_string ⇒ Object
- #details ⇒ Object
-
#disconnect ⇒ Object
Disconnects the interface from its target(s).
-
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
constructor
A new instance of MqttInterface.
- #read ⇒ Object
-
#read_interface ⇒ Object
Reads from the client.
-
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option).
- #write(packet) ⇒ Object
-
#write_interface(data, extra = nil) ⇒ Object
Writes to the client.
Methods inherited from Interface
#_write, #add_protocol, #as_json, #convert_data_to_packet, #convert_packet_to_data, #copy_to, #interface_cmd, #post_connect, #protocol_cmd, #read_allowed?, #read_interface_base, #start_raw_logging, #stop_raw_logging, #write_allowed?, #write_interface_base, #write_raw, #write_raw_allowed?
Methods included from Api
#_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_tlm, #get_all_tlm_item_names, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_available, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_details, #interface_protocol_cmd, #interface_target_disable, #interface_target_enable, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #map_target_to_router, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_details, #router_protocol_cmd, #router_target_disable, #router_target_enable, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_with_units, #unmap_target_from_interface, #unmap_target_from_router, #update_news, #update_plugin_store
Methods included from CmdLog
Constructor Details
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
Returns a new instance of MqttInterface.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 91 def initialize(hostname, port = 1883, ssl = false) super() @hostname = hostname @port = Integer(port) @ssl = ConfigParser.handle_true_false(ssl) @ack_timeout = 5.0 @username = nil @password = nil @cert = nil @key = nil @ca_file = nil @write_topics = [] @read_topics = [] # Build list of packets by topic @read_packets_by_topic = {} System.telemetry.all.each do |_target_name, target_packets| target_packets.each do |_packet_name, packet| topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @read_packets_by_topic[topic] = packet end end end end end |
Instance Method Details
#connect ⇒ Object
Connects the interface to its target(s)
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 126 def connect @write_topics = [] @read_topics = [] @client = MQTT::Client.new @client.ack_timeout = @ack_timeout @client.host = @hostname @client.port = @port @client.username = @username if @username @client.password = @password if @password @client.ssl = @ssl if @cert and @key @client.ssl = true @client.cert_file = @cert.path @client.key_file = @key.path end if @ca_file @client.ssl = true @client.ca_file = @ca_file.path end @client.connect @read_packets_by_topic.each do |topic, _| Logger.info "#{@name}: Subscribing to #{topic}" @client.subscribe(topic) end super() end |
#connected? ⇒ Boolean
Returns Whether the MQTT client is connected.
154 155 156 157 158 159 160 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 154 def connected? if @client return @client.connected? else return false end end |
#connection_string ⇒ Object
121 122 123 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 121 def connection_string return "#{@hostname}:#{@port} (ssl: #{@ssl})" end |
#details ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 259 def details result = super() result['hostname'] = @hostname result['port'] = @port result['ssl'] = @ssl result['ack_timeout'] = @ack_timeout result['username'] = @username result['password'] = 'Set' if @password result['cert'] = 'Set' if @cert result['key'] = 'Set' if @key result['ca_file'] = 'Set' if @ca_file result['options'].delete('PASSWORD') result['options'].delete('CERT') result['options'].delete('KEY') result['options'].delete('CA_FILE') result['read_packets_by_topic'] = @read_packets_by_topic return result end |
#disconnect ⇒ Object
Disconnects the interface from its target(s)
163 164 165 166 167 168 169 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 163 def disconnect if @client @client.disconnect @client = nil end super() end |
#read ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 171 def read packet = super() topic = @read_topics.shift return nil unless packet identified_packet = @read_packets_by_topic[topic] if identified_packet identified_packet = identified_packet.dup identified_packet.buffer = packet.buffer packet = identified_packet end packet.received_time = nil return packet end |
#read_interface ⇒ Object
Reads from the client
201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 201 def read_interface topic, data = @client.get if data.nil? or data.length <= 0 Logger.info "#{@name}: read returned nil" if data.nil? Logger.info "#{@name}: read returned 0 bytes" if not data.nil? and data.length <= 0 return nil end @read_topics << topic extra = nil read_interface_base(data, extra) return data, extra rescue IOError # Disconnected return nil end |
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option)
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 232 def set_option(option_name, option_values) super(option_name, option_values) case option_name.upcase when 'ACK_TIMEOUT' @ack_timeout = Float(option_values[0]) when 'USERNAME' @username = option_values[0] when 'PASSWORD' @password = option_values[0] when 'CERT' # CERT must be given as a file @cert = Tempfile.new('cert') @cert.write(option_values[0]) @cert.close when 'KEY' # KEY must be given as a file @key = Tempfile.new('key') @key.write(option_values[0]) @key.close when 'CA_FILE' # CA_FILE must be given as a file @ca_file = Tempfile.new('ca_file') @ca_file.write(option_values[0]) @ca_file.close end end |
#write(packet) ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 185 def write(packet) @write_mutex.synchronize do topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @write_topics << topic super(packet) end else raise "Command packet '#{packet.target_name} #{packet.packet_name}' requires a META TOPIC or TOPICS" end end end |
#write_interface(data, extra = nil) ⇒ Object
Writes to the client
218 219 220 221 222 223 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 218 def write_interface(data, extra = nil) write_interface_base(data, extra) topic = @write_topics.shift @client.publish(topic, data) return data, extra end |