Class: Protocol::MQTT::Packet::Subscribe
- Inherits:
-
Protocol::MQTT::Packet
- Object
- Protocol::MQTT::Packet
- Protocol::MQTT::Packet::Subscribe
- Defined in:
- lib/protocol/mqtt/packet/subscribe.rb
Overview
SUBSCRIBE (§3.8). Fixed header flags nibble is 0b0010 (MQTT-3.8.1-1).
Variable header: packet_id (u16) + v5 property block. Payload: one or more subscription entries. Each entry is a topic filter (utf8) followed by an options byte.
Options byte layout:
* bits 0..1 — requested QoS (0..2)
* bit 2 — No Local (v5 only)
* bit 3 — Retain As Published (v5 only)
* bits 4..5 — Retain Handling (v5 only; 0/1/2)
* bits 6..7 — reserved (must be 0)
Each filters entry is a Hash:
{ filter:, qos: 0, no_local: false, retain_as_published: false,
retain_handling: 0 }
Constant Summary collapse
- TYPE_ID =
SUBSCRIBE
Constants inherited from Protocol::MQTT::Packet
AUTH, CONNACK, CONNECT, DISCONNECT, PINGREQ, PINGRESP, PUBACK, PUBCOMP, PUBLISH, PUBREC, PUBREL, SUBACK, SUBSCRIBE, UNSUBACK, UNSUBSCRIBE
Instance Attribute Summary collapse
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#packet_id ⇒ Object
readonly
Returns the value of attribute packet_id.
-
#properties ⇒ Object
readonly
Returns the value of attribute properties.
Class Method Summary collapse
Instance Method Summary collapse
- #==(other) ⇒ Object (also: #eql?)
- #encode_body(version) ⇒ Object
- #flags_nibble(_version) ⇒ Object
- #hash ⇒ Object
-
#initialize(packet_id:, filters:, properties: {}) ⇒ Subscribe
constructor
A new instance of Subscribe.
Methods inherited from Protocol::MQTT::Packet
decode, decode_from_body, #encode, encode_packet, register
Constructor Details
#initialize(packet_id:, filters:, properties: {}) ⇒ Subscribe
Returns a new instance of Subscribe.
32 33 34 35 36 37 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 32 def initialize(packet_id:, filters:, properties: {}) raise ArgumentError, "SUBSCRIBE requires at least one filter" if filters.empty? @packet_id = packet_id @filters = filters.map { |f| normalize_filter(f) } @properties = properties end |
Instance Attribute Details
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
29 30 31 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 29 def filters @filters end |
#packet_id ⇒ Object (readonly)
Returns the value of attribute packet_id.
29 30 31 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 29 def packet_id @packet_id end |
#properties ⇒ Object (readonly)
Returns the value of attribute properties.
29 30 31 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 29 def properties @properties end |
Class Method Details
.decode_body(reader, flags:, version:) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 75 def self.decode_body(reader, flags:, version:) raise MalformedPacket, "SUBSCRIBE reserved flags must be 0b0010" if flags != 0b0010 packet_id = reader.read_u16 properties = version == 5 ? Property.decode(reader) : {} filters = [] while !reader.eof? filter = reader.read_utf8 opts = reader.read_u8 qos = opts & 0b11 raise MalformedPacket, "invalid QoS 3 in SUBSCRIBE options" if qos == 3 entry = { filter: filter, qos: qos } if version == 5 entry[:no_local] = (opts & 0b0100) != 0 entry[:retain_as_published] = (opts & 0b1000) != 0 entry[:retain_handling] = (opts >> 4) & 0b11 raise MalformedPacket, "invalid retain_handling 3" if entry[:retain_handling] == 3 raise MalformedPacket, "reserved SUBSCRIBE option bits must be 0" if (opts & 0b11000000) != 0 else raise MalformedPacket, "reserved SUBSCRIBE option bits must be 0" if (opts & 0b11111100) != 0 end filters << entry end raise MalformedPacket, "SUBSCRIBE must carry at least one filter" if filters.empty? new(packet_id: packet_id, filters: filters, properties: properties) end |
Instance Method Details
#==(other) ⇒ Object Also known as: eql?
102 103 104 105 106 107 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 102 def ==(other) other.is_a?(Subscribe) && other.packet_id == @packet_id && other.filters == @filters && other.properties == @properties end |
#encode_body(version) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 57 def encode_body(version) w = Codec::Writer.new w.write_u16(@packet_id) w.write(Property.encode(@properties)) if version == 5 @filters.each do |f| w.write_utf8(f.fetch(:filter)) opts = f.fetch(:qos, 0) & 0b11 if version == 5 opts |= 0b0100 if f[:no_local] opts |= 0b1000 if f[:retain_as_published] opts |= (f.fetch(:retain_handling, 0) & 0b11) << 4 end w.write_u8(opts) end w.bytes end |
#flags_nibble(_version) ⇒ Object
52 53 54 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 52 def flags_nibble(_version) 0b0010 end |
#hash ⇒ Object
111 112 113 |
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 111 def hash [self.class, @packet_id, @filters, @properties].hash end |