Class: Protocol::MQTT::Packet::Subscribe

Inherits:
Protocol::MQTT::Packet show all
Defined in:
lib/protocol/mqtt/packet/subscribe.rb

Overview

SUBSCRIBE (§3.8). Fixed header flags nibble is 0b0010 (MQTT-3.8.1-1).

Variable header: packet_id (u16) + v5 property block. Payload: one or more subscription entries. Each entry is a topic filter (utf8) followed by an options byte.

Options byte layout:

* bits 0..1 — requested QoS (0..2)
* bit 2     — No Local (v5 only)
* bit 3     — Retain As Published (v5 only)
* bits 4..5 — Retain Handling (v5 only; 0/1/2)
* bits 6..7 — reserved (must be 0)

Each filters entry is a Hash:

{ filter:, qos: 0, no_local: false, retain_as_published: false,
  retain_handling: 0 }

Constant Summary collapse

TYPE_ID =
SUBSCRIBE

Constants inherited from Protocol::MQTT::Packet

AUTH, CONNACK, CONNECT, DISCONNECT, PINGREQ, PINGRESP, PUBACK, PUBCOMP, PUBLISH, PUBREC, PUBREL, SUBACK, SUBSCRIBE, UNSUBACK, UNSUBSCRIBE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Protocol::MQTT::Packet

decode, decode_from_body, #encode, encode_packet, register

Constructor Details

#initialize(packet_id:, filters:, properties: {}) ⇒ Subscribe

Returns a new instance of Subscribe.

Raises:

  • (ArgumentError)


32
33
34
35
36
37
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 32

def initialize(packet_id:, filters:, properties: {})
  raise ArgumentError, "SUBSCRIBE requires at least one filter" if filters.empty?
  @packet_id = packet_id
  @filters = filters.map { |f| normalize_filter(f) }
  @properties = properties
end

Instance Attribute Details

#filtersObject (readonly)

Returns the value of attribute filters.



29
30
31
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 29

def filters
  @filters
end

#packet_idObject (readonly)

Returns the value of attribute packet_id.



29
30
31
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 29

def packet_id
  @packet_id
end

#propertiesObject (readonly)

Returns the value of attribute properties.



29
30
31
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 29

def properties
  @properties
end

Class Method Details

.decode_body(reader, flags:, version:) ⇒ Object

Raises:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 75

def self.decode_body(reader, flags:, version:)
  raise MalformedPacket, "SUBSCRIBE reserved flags must be 0b0010" if flags != 0b0010
  packet_id = reader.read_u16
  properties = version == 5 ? Property.decode(reader) : {}
  filters = []
  while !reader.eof?
    filter = reader.read_utf8
    opts = reader.read_u8
    qos = opts & 0b11
    raise MalformedPacket, "invalid QoS 3 in SUBSCRIBE options" if qos == 3
    entry = { filter: filter, qos: qos }
    if version == 5
      entry[:no_local]            = (opts & 0b0100) != 0
      entry[:retain_as_published] = (opts & 0b1000) != 0
      entry[:retain_handling]     = (opts >> 4) & 0b11
      raise MalformedPacket, "invalid retain_handling 3" if entry[:retain_handling] == 3
      raise MalformedPacket, "reserved SUBSCRIBE option bits must be 0" if (opts & 0b11000000) != 0
    else
      raise MalformedPacket, "reserved SUBSCRIBE option bits must be 0" if (opts & 0b11111100) != 0
    end
    filters << entry
  end
  raise MalformedPacket, "SUBSCRIBE must carry at least one filter" if filters.empty?
  new(packet_id: packet_id, filters: filters, properties: properties)
end

Instance Method Details

#==(other) ⇒ Object Also known as: eql?



102
103
104
105
106
107
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 102

def ==(other)
  other.is_a?(Subscribe) &&
    other.packet_id == @packet_id &&
    other.filters == @filters &&
    other.properties == @properties
end

#encode_body(version) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 57

def encode_body(version)
  w = Codec::Writer.new
  w.write_u16(@packet_id)
  w.write(Property.encode(@properties)) if version == 5
  @filters.each do |f|
    w.write_utf8(f.fetch(:filter))
    opts = f.fetch(:qos, 0) & 0b11
    if version == 5
      opts |= 0b0100 if f[:no_local]
      opts |= 0b1000 if f[:retain_as_published]
      opts |= (f.fetch(:retain_handling, 0) & 0b11) << 4
    end
    w.write_u8(opts)
  end
  w.bytes
end

#flags_nibble(_version) ⇒ Object



52
53
54
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 52

def flags_nibble(_version)
  0b0010
end

#hashObject



111
112
113
# File 'lib/protocol/mqtt/packet/subscribe.rb', line 111

def hash
  [self.class, @packet_id, @filters, @properties].hash
end