Class: ZZQ::Message
- Inherits:
-
Object
- Object
- ZZQ::Message
- Defined in:
- lib/zzq/message.rb
Overview
One MQTT application message as delivered to a subscriber or accepted by Client#publish / Broker#ingest.
‘Message` is independent of any particular connection: it carries every wire-level field that a PUBLISH packet does, so it can be shipped across non-MQTT transports (OMQ, NNQ, a file, a queue) via #to_wire / Message.from_wire and re-injected at the other end.
‘#ack!` is only meaningful for QoS 1/2 messages received with `auto_ack: false` — it sends the appropriate PUBACK/PUBCOMP.
Instance Attribute Summary collapse
-
#dup ⇒ Object
readonly
Returns the value of attribute dup.
-
#packet_id ⇒ Object
readonly
Returns the value of attribute packet_id.
-
#payload ⇒ Object
readonly
Returns the value of attribute payload.
-
#properties ⇒ Object
readonly
Returns the value of attribute properties.
-
#qos ⇒ Object
readonly
Returns the value of attribute qos.
-
#retain ⇒ Object
readonly
Returns the value of attribute retain.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Class Method Summary collapse
-
.from_publish(pkt, on_ack: nil) ⇒ Object
Build a Message from a decoded PUBLISH packet.
-
.from_wire(bytes) ⇒ Object
Inverse of #to_wire.
Instance Method Summary collapse
- #==(other) ⇒ Object (also: #eql?)
- #ack! ⇒ Object
- #acked? ⇒ Boolean
- #hash ⇒ Object
-
#initialize(topic:, payload:, qos: 0, retain: false, dup: false, packet_id: nil, properties: {}, on_ack: nil) ⇒ Message
constructor
A new instance of Message.
-
#to_publish ⇒ Object
Build a PUBLISH packet that represents this message.
-
#to_wire ⇒ Object
Encode this message as a self-describing byte string using the PUBLISH packet codec.
Constructor Details
#initialize(topic:, payload:, qos: 0, retain: false, dup: false, packet_id: nil, properties: {}, on_ack: nil) ⇒ Message
Returns a new instance of Message.
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/zzq/message.rb', line 20 def initialize(topic:, payload:, qos: 0, retain: false, dup: false, packet_id: nil, properties: {}, on_ack: nil) @topic = topic @payload = payload.b.freeze @qos = qos @retain = retain @dup = dup @packet_id = packet_id @properties = properties @on_ack = on_ack @acked = false end |
Instance Attribute Details
#dup ⇒ Object (readonly)
Returns the value of attribute dup.
17 18 19 |
# File 'lib/zzq/message.rb', line 17 def dup @dup end |
#packet_id ⇒ Object (readonly)
Returns the value of attribute packet_id.
17 18 19 |
# File 'lib/zzq/message.rb', line 17 def packet_id @packet_id end |
#payload ⇒ Object (readonly)
Returns the value of attribute payload.
17 18 19 |
# File 'lib/zzq/message.rb', line 17 def payload @payload end |
#properties ⇒ Object (readonly)
Returns the value of attribute properties.
17 18 19 |
# File 'lib/zzq/message.rb', line 17 def properties @properties end |
#qos ⇒ Object (readonly)
Returns the value of attribute qos.
17 18 19 |
# File 'lib/zzq/message.rb', line 17 def qos @qos end |
#retain ⇒ Object (readonly)
Returns the value of attribute retain.
17 18 19 |
# File 'lib/zzq/message.rb', line 17 def retain @retain end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
17 18 19 |
# File 'lib/zzq/message.rb', line 17 def topic @topic end |
Class Method Details
.from_publish(pkt, on_ack: nil) ⇒ Object
Build a Message from a decoded PUBLISH packet.
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/zzq/message.rb', line 82 def self.from_publish(pkt, on_ack: nil) new( topic: pkt.topic, payload: pkt.payload, qos: pkt.qos, retain: pkt.retain, dup: pkt.dup, packet_id: pkt.packet_id, properties: pkt.properties, on_ack: on_ack, ) end |
.from_wire(bytes) ⇒ Object
Inverse of #to_wire. Raises Protocol::MQTT::MalformedPacket if the bytes don’t decode as a PUBLISH packet.
58 59 60 61 62 63 |
# File 'lib/zzq/message.rb', line 58 def self.from_wire(bytes) pkt, consumed = Protocol::MQTT::Packet.decode(bytes, version: 5) raise Protocol::MQTT::MalformedPacket, "expected PUBLISH, got #{pkt.class}" unless pkt.is_a?(Protocol::MQTT::Packet::Publish) raise Protocol::MQTT::MalformedPacket, "trailing bytes after PUBLISH" if consumed != bytes.bytesize from_publish(pkt) end |
Instance Method Details
#==(other) ⇒ Object Also known as: eql?
96 97 98 99 100 101 102 103 104 105 |
# File 'lib/zzq/message.rb', line 96 def ==(other) other.is_a?(Message) && other.topic == @topic && other.payload == @payload && other.qos == @qos && other.retain == @retain && other.dup == @dup && other.packet_id == @packet_id && other.properties == @properties end |
#ack! ⇒ Object
34 35 36 37 38 |
# File 'lib/zzq/message.rb', line 34 def ack! return if @qos.zero? || @acked @acked = true @on_ack&.call(self) end |
#acked? ⇒ Boolean
41 42 43 |
# File 'lib/zzq/message.rb', line 41 def acked? @acked end |
#hash ⇒ Object
109 110 111 |
# File 'lib/zzq/message.rb', line 109 def hash [self.class, @topic, @payload, @qos, @retain, @dup, @packet_id, @properties].hash end |
#to_publish ⇒ Object
Build a PUBLISH packet that represents this message. Used both internally (for over-the-wire transmission) and by #to_wire.
68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/zzq/message.rb', line 68 def to_publish Protocol::MQTT::Packet::Publish.new( topic: @topic, payload: @payload, qos: @qos, retain: @retain, dup: @dup, packet_id: @packet_id, properties: @properties, ) end |
#to_wire ⇒ Object
Encode this message as a self-describing byte string using the PUBLISH packet codec. The output can be cross-shipped over a non-MQTT transport and reconstructed via from_wire. v5 wire format is used so the full property block round-trips.
50 51 52 53 |
# File 'lib/zzq/message.rb', line 50 def to_wire packet = to_publish packet.encode(version: 5) end |