Class: Protocol::MQTT::Packet

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/mqtt/packet.rb,
lib/protocol/mqtt/packet/auth.rb,
lib/protocol/mqtt/packet/puback.rb,
lib/protocol/mqtt/packet/pubrec.rb,
lib/protocol/mqtt/packet/pubrel.rb,
lib/protocol/mqtt/packet/suback.rb,
lib/protocol/mqtt/packet/connack.rb,
lib/protocol/mqtt/packet/connect.rb,
lib/protocol/mqtt/packet/pingreq.rb,
lib/protocol/mqtt/packet/pubcomp.rb,
lib/protocol/mqtt/packet/publish.rb,
lib/protocol/mqtt/packet/pingresp.rb,
lib/protocol/mqtt/packet/unsuback.rb,
lib/protocol/mqtt/packet/subscribe.rb,
lib/protocol/mqtt/packet/disconnect.rb,
lib/protocol/mqtt/packet/unsubscribe.rb

Overview

Base class for all MQTT control packets.

Wire layout of every packet:

byte 0: [ type (4 bits) | flags (4 bits) ]
byte 1+: VBI remaining-length
byte N+: variable header + payload (length = remaining-length)

Subclasses implement:

* +TYPE_ID+ (1..15) — the type nibble
* +#flags_nibble(version)+ — returns the low 4 bits of byte 0.
  Most packets return 0. PUBREL/SUBSCRIBE/UNSUBSCRIBE return
  0b0010. PUBLISH returns DUP|QoS|RETAIN.
* +#encode_body(version)+ — variable header + payload bytes.
* +Subclass.decode_body(reader, flags:, version:)+ — parses a
  Codec::Reader positioned at the start of the variable header
  and returns an instance.

Defined Under Namespace

Classes: Auth, Connack, Connect, Disconnect, Pingreq, Pingresp, Puback, Pubcomp, Publish, Pubrec, Pubrel, Suback, Subscribe, Unsuback, Unsubscribe

Constant Summary collapse

CONNECT =
1
CONNACK =
2
PUBLISH =
3
PUBACK =
4
PUBREC =
5
PUBREL =
6
PUBCOMP =
7
SUBSCRIBE =
8
SUBACK =
9
UNSUBSCRIBE =
10
UNSUBACK =
11
PINGREQ =
12
PINGRESP =
13
DISCONNECT =
14
AUTH =
15

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.registryObject (readonly)

Returns the value of attribute registry.



48
49
50
# File 'lib/protocol/mqtt/packet.rb', line 48

def registry
  @registry
end

Class Method Details

.decode(bytes, version:) ⇒ Object

Given bytes containing at least one full packet, decode the first one. Returns [packet, bytes_consumed]. Raises MalformedPacket on truncation — caller is responsible for having buffered enough bytes. For streaming decode, use Protocol::MQTT::Connection.

Raises:



72
73
74
75
76
77
78
79
80
81
# File 'lib/protocol/mqtt/packet.rb', line 72

def decode(bytes, version:)
  reader = Codec::Reader.new(bytes)
  type_flags = reader.read_u8
  type = type_flags >> 4
  flags = type_flags & 0x0F
  length = reader.read_vbi
  raise MalformedPacket, "truncated packet body" if reader.remaining < length
  body = reader.read(length)
  [decode_from_body(type, flags, body, version: version), reader.pos]
end

.decode_from_body(type, flags, body, version:) ⇒ Object

Dispatch a pre-framed packet body to its subclass’s decode_body. Used by streaming decoders (see Protocol::MQTT::Connection).

Raises:



86
87
88
89
90
91
92
# File 'lib/protocol/mqtt/packet.rb', line 86

def decode_from_body(type, flags, body, version:)
  klass = @registry[type] or raise MalformedPacket, "unknown packet type #{type}"
  body_reader = Codec::Reader.new(body)
  pkt = klass.decode_body(body_reader, flags: flags, version: version)
  raise MalformedPacket, "#{klass.name} body overrun" unless body_reader.eof?
  pkt
end

.encode_packet(packet, version:) ⇒ Object

Encode the packet’s fixed header + body into a frozen BINARY String.



58
59
60
61
62
63
64
# File 'lib/protocol/mqtt/packet.rb', line 58

def encode_packet(packet, version:)
  body = packet.encode_body(version)
  head = Codec::Writer.new
  head.write_u8((packet.class::TYPE_ID << 4) | packet.flags_nibble(version))
  head.write_vbi(body.bytesize)
  (head.bytes + body).force_encoding(Encoding::BINARY).freeze
end

.register(type_id, klass) ⇒ Object



51
52
53
# File 'lib/protocol/mqtt/packet.rb', line 51

def register(type_id, klass)
  @registry[type_id] = klass
end

Instance Method Details

#encode(version:) ⇒ Object



103
104
105
# File 'lib/protocol/mqtt/packet.rb', line 103

def encode(version:)
  Packet.encode_packet(self, version: version)
end

#flags_nibble(_version) ⇒ Object

Default flags nibble (overridden by PUBLISH, PUBREL, SUBSCRIBE, UNSUBSCRIBE).



98
99
100
# File 'lib/protocol/mqtt/packet.rb', line 98

def flags_nibble(_version)
  0
end