Class: OpenC3::MqttStream
- Defined in:
- lib/openc3/streams/mqtt_stream.rb
Instance Attribute Summary collapse
-
#ca_file ⇒ Object
Returns the value of attribute ca_file.
-
#cert ⇒ Object
Returns the value of attribute cert.
-
#hostname ⇒ Object
readonly
Returns the value of attribute hostname.
-
#key ⇒ Object
Returns the value of attribute key.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#read_topic ⇒ Object
readonly
Returns the value of attribute read_topic.
-
#ssl ⇒ Object
readonly
Returns the value of attribute ssl.
-
#username ⇒ Object
Returns the value of attribute username.
-
#write_topic ⇒ Object
readonly
Returns the value of attribute write_topic.
Instance Method Summary collapse
-
#connect ⇒ Object
Connect the stream.
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
-
#initialize(hostname, port = 1883, ssl = false, write_topic = nil, read_topic = nil, ack_timeout = 5) ⇒ MqttStream
constructor
A new instance of MqttStream.
-
#read ⇒ String
Returns a binary string of data from the read_topic.
- #write(data) ⇒ Object
Methods inherited from Stream
Constructor Details
#initialize(hostname, port = 1883, ssl = false, write_topic = nil, read_topic = nil, ack_timeout = 5) ⇒ MqttStream
Returns a new instance of MqttStream.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 31 def initialize(hostname, port = 1883, ssl = false, write_topic = nil, read_topic = nil, ack_timeout = 5) super() @hostname = hostname @port = Integer(port) @ssl = ConfigParser.handle_true_false(ssl) @write_topic = ConfigParser.handle_nil(write_topic) @read_topic = ConfigParser.handle_nil(read_topic) @ack_timeout = Float(ack_timeout) @username = nil @password = nil @cert = nil @key = nil @ca_file = nil # Mutex on write is needed to protect from commands coming in from more than one tool @write_mutex = Mutex.new end |
Instance Attribute Details
#ca_file ⇒ Object
Returns the value of attribute ca_file.
29 30 31 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 29 def ca_file @ca_file end |
#cert ⇒ Object
Returns the value of attribute cert.
27 28 29 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 27 def cert @cert end |
#hostname ⇒ Object (readonly)
Returns the value of attribute hostname.
20 21 22 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 20 def hostname @hostname end |
#key ⇒ Object
Returns the value of attribute key.
28 29 30 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 28 def key @key end |
#password ⇒ Object
Returns the value of attribute password.
26 27 28 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 26 def password @password end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
21 22 23 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 21 def port @port end |
#read_topic ⇒ Object (readonly)
Returns the value of attribute read_topic.
24 25 26 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 24 def read_topic @read_topic end |
#ssl ⇒ Object (readonly)
Returns the value of attribute ssl.
22 23 24 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 22 def ssl @ssl end |
#username ⇒ Object
Returns the value of attribute username.
25 26 27 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 25 def username @username end |
#write_topic ⇒ Object (readonly)
Returns the value of attribute write_topic.
23 24 25 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 23 def write_topic @write_topic end |
Instance Method Details
#connect ⇒ Object
Connect the stream
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 52 def connect @client = MQTT::Client.new @client.ack_timeout = @ack_timeout @client.host = @hostname @client.port = @port @client.ssl = @ssl @client.username = @username if @username @client.password = @password if @password if @cert and @key @client.ssl = true @client.cert_file = @cert.path @client.key_file = @key.path end if @ca_file @client.ssl = true @client.ca_file = @ca_file.path end @client.connect @client.subscribe(@read_topic) if @read_topic end |
#connected? ⇒ Boolean
73 74 75 76 77 78 79 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 73 def connected? if @client return @client.connected? else return false end end |
#disconnect ⇒ Object
81 82 83 84 85 86 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 81 def disconnect if @client @client.disconnect @client = nil end end |
#read ⇒ String
Returns a binary string of data from the read_topic
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 89 def read raise "Attempt to read from write only stream" unless @read_topic # No read mutex is needed because reads happen serially _, data = @client.get if data.nil? or data.length <= 0 Logger.info "MqttStream: read returned nil" if data.nil? Logger.info "MqttStream: read returned 0 bytes" if not data.nil? and data.length <= 0 return nil end return data end |
#write(data) ⇒ Object
104 105 106 107 108 109 110 |
# File 'lib/openc3/streams/mqtt_stream.rb', line 104 def write(data) raise "Attempt to write to read only stream" unless @write_topic @write_mutex.synchronize do @client.publish(@write_topic, data) end end |