Class: Protocol::MQTT::Connection
- Inherits:
-
Object
- Object
- Protocol::MQTT::Connection
- Defined in:
- lib/protocol/mqtt/connection.rb
Overview
Streaming packet framer over a buffered IO (e.g. Async::IO::Stream).
The only I/O coupling in the protocol-mqtt gem. Reads and writes whole Packet objects one at a time; does not implement the MQTT session state machine (that lives in the zzq runtime).
Wire version is set separately from construction because a server cannot know it until the CONNECT packet has been parsed. Pattern:
conn = Protocol::MQTT::Connection.new(io) # defaults to v3
connect = conn.read_packet
conn.version = 5 # if connect says so
# ... subsequent reads/writes branch on @version
Not fiber-safe: a single Connection should be owned by at most one reader fiber and one writer fiber. Concurrent writers must synchronize externally.
Constant Summary collapse
- DEFAULT_VERSION =
Default MQTT wire version (v3.1.1). Set via
#version=after CONNECT is received. 3
Instance Attribute Summary collapse
-
#io ⇒ #read, ...
readonly
Underlying IO.
-
#max_packet_size ⇒ Integer?
Maximum inbound packet size in bytes; nil = unlimited.
- #version ⇒ 3, 5
Instance Method Summary collapse
-
#close ⇒ Object
Close the underlying IO.
- #closed? ⇒ Boolean
-
#initialize(io, version: DEFAULT_VERSION, max_packet_size: nil) ⇒ Connection
constructor
A new instance of Connection.
-
#read_connect_packet ⇒ Packet::Connect?
Reads a CONNECT packet from the IO, auto-detecting the wire version from the Protocol Level byte and setting
#versionbefore decoding the body. -
#read_packet ⇒ Packet?
Reads one complete packet from the IO.
-
#write_packet(packet) ⇒ void
Encodes and writes one packet, then flushes.
Constructor Details
#initialize(io, version: DEFAULT_VERSION, max_packet_size: nil) ⇒ Connection
Returns a new instance of Connection.
44 45 46 47 48 49 |
# File 'lib/protocol/mqtt/connection.rb', line 44 def initialize(io, version: DEFAULT_VERSION, max_packet_size: nil) @io = io @version = version @max_packet_size = max_packet_size @closed = false end |
Instance Attribute Details
#io ⇒ #read, ... (readonly)
Returns underlying IO.
36 37 38 |
# File 'lib/protocol/mqtt/connection.rb', line 36 def io @io end |
#max_packet_size ⇒ Integer?
Returns maximum inbound packet size in bytes; nil = unlimited. When set, a decoded remaining-length > max raises MalformedPacket before body is consumed.
41 42 43 |
# File 'lib/protocol/mqtt/connection.rb', line 41 def max_packet_size @max_packet_size end |
#version ⇒ 3, 5
33 34 35 |
# File 'lib/protocol/mqtt/connection.rb', line 33 def version @version end |
Instance Method Details
#close ⇒ Object
Close the underlying IO. Idempotent.
118 119 120 121 122 |
# File 'lib/protocol/mqtt/connection.rb', line 118 def close return if @closed @closed = true @io.close end |
#closed? ⇒ Boolean
125 126 127 |
# File 'lib/protocol/mqtt/connection.rb', line 125 def closed? @closed end |
#read_connect_packet ⇒ Packet::Connect?
Reads a CONNECT packet from the IO, auto-detecting the wire version from the Protocol Level byte and setting #version before decoding the body. Brokers use this for the first packet on a newly accepted connection; after it returns, subsequent #read_packet / #write_packet calls honor the negotiated version.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/protocol/mqtt/connection.rb', line 85 def read_connect_packet first = @io.read(1) return nil if first.nil? || first.empty? type_flags = first.unpack1("C") type = type_flags >> 4 flags = type_flags & 0x0F raise MalformedPacket, "expected CONNECT, got type #{type}" unless type == Packet::CONNECT length = read_vbi_from_io if @max_packet_size && (length + 2) > @max_packet_size raise MalformedPacket, "packet exceeds max_packet_size: #{length + 2} > #{@max_packet_size}" end body = length.zero? ? "".b : read_exactly(length) level = Packet::Connect.peek_protocol_level(body) @version = level == 5 ? 5 : 3 Packet.decode_from_body(type, flags, body, version: @version) end |
#read_packet ⇒ Packet?
Reads one complete packet from the IO.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/protocol/mqtt/connection.rb', line 57 def read_packet first = @io.read(1) return nil if first.nil? || first.empty? type_flags = first.unpack1("C") type = type_flags >> 4 flags = type_flags & 0x0F length = read_vbi_from_io if @max_packet_size && (length + 2) > @max_packet_size raise MalformedPacket, "packet exceeds max_packet_size: #{length + 2} > #{@max_packet_size}" end body = length.zero? ? "".b : read_exactly(length) Packet.decode_from_body(type, flags, body, version: @version) end |
#write_packet(packet) ⇒ void
This method returns an undefined value.
Encodes and writes one packet, then flushes.
110 111 112 113 114 |
# File 'lib/protocol/mqtt/connection.rb', line 110 def write_packet(packet) raise ClosedError, "connection closed" if @closed @io.write(packet.encode(version: @version)) @io.flush end |