Class: Protocol::MQTT::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/protocol/mqtt/connection.rb

Overview

Streaming packet framer over a buffered IO (e.g. Async::IO::Stream).

The only I/O coupling in the protocol-mqtt gem. Reads and writes whole Packet objects one at a time; does not implement the MQTT session state machine (that lives in the zzq runtime).

Wire version is set separately from construction because a server cannot know it until the CONNECT packet has been parsed. Pattern:

conn = Protocol::MQTT::Connection.new(io)  # defaults to v3
connect = conn.read_packet
conn.version = 5  # if connect says so
# ... subsequent reads/writes branch on @version

Not fiber-safe: a single Connection should be owned by at most one reader fiber and one writer fiber. Concurrent writers must synchronize externally.

Constant Summary collapse

DEFAULT_VERSION =

Default MQTT wire version (v3.1.1). Set via #version= after CONNECT is received.

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, version: DEFAULT_VERSION, max_packet_size: nil) ⇒ Connection

Returns a new instance of Connection.



44
45
46
47
48
49
# File 'lib/protocol/mqtt/connection.rb', line 44

def initialize(io, version: DEFAULT_VERSION, max_packet_size: nil)
  @io = io
  @version = version
  @max_packet_size = max_packet_size
  @closed = false
end

Instance Attribute Details

#io#read, ... (readonly)

Returns underlying IO.

Returns:

  • (#read, #read_exactly, #write, #flush, #close)

    underlying IO



36
37
38
# File 'lib/protocol/mqtt/connection.rb', line 36

def io
  @io
end

#max_packet_sizeInteger?

Returns maximum inbound packet size in bytes; nil = unlimited. When set, a decoded remaining-length > max raises MalformedPacket before body is consumed.

Returns:

  • (Integer, nil)

    maximum inbound packet size in bytes; nil = unlimited. When set, a decoded remaining-length > max raises MalformedPacket before body is consumed.



41
42
43
# File 'lib/protocol/mqtt/connection.rb', line 41

def max_packet_size
  @max_packet_size
end

#version3, 5

Returns:

  • (3, 5)


33
34
35
# File 'lib/protocol/mqtt/connection.rb', line 33

def version
  @version
end

Instance Method Details

#closeObject

Close the underlying IO. Idempotent.



118
119
120
121
122
# File 'lib/protocol/mqtt/connection.rb', line 118

def close
  return if @closed
  @closed = true
  @io.close
end

#closed?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/protocol/mqtt/connection.rb', line 125

def closed?
  @closed
end

#read_connect_packetPacket::Connect?

Reads a CONNECT packet from the IO, auto-detecting the wire version from the Protocol Level byte and setting #version before decoding the body. Brokers use this for the first packet on a newly accepted connection; after it returns, subsequent #read_packet / #write_packet calls honor the negotiated version.

Returns:

Raises:

  • (MalformedPacket)

    if the first packet isn’t CONNECT or the protocol level is unsupported.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/protocol/mqtt/connection.rb', line 85

def read_connect_packet
  first = @io.read(1)
  return nil if first.nil? || first.empty?

  type_flags = first.unpack1("C")
  type = type_flags >> 4
  flags = type_flags & 0x0F
  raise MalformedPacket, "expected CONNECT, got type #{type}" unless type == Packet::CONNECT

  length = read_vbi_from_io
  if @max_packet_size && (length + 2) > @max_packet_size
    raise MalformedPacket, "packet exceeds max_packet_size: #{length + 2} > #{@max_packet_size}"
  end

  body = length.zero? ? "".b : read_exactly(length)
  level = Packet::Connect.peek_protocol_level(body)
  @version = level == 5 ? 5 : 3
  Packet.decode_from_body(type, flags, body, version: @version)
end

#read_packetPacket?

Reads one complete packet from the IO.

Returns:

  • (Packet, nil)

    decoded packet, or nil on clean EOF at a packet boundary.

Raises:



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/protocol/mqtt/connection.rb', line 57

def read_packet
  first = @io.read(1)
  return nil if first.nil? || first.empty?

  type_flags = first.unpack1("C")
  type = type_flags >> 4
  flags = type_flags & 0x0F

  length = read_vbi_from_io
  if @max_packet_size && (length + 2) > @max_packet_size
    raise MalformedPacket, "packet exceeds max_packet_size: #{length + 2} > #{@max_packet_size}"
  end

  body = length.zero? ? "".b : read_exactly(length)
  Packet.decode_from_body(type, flags, body, version: @version)
end

#write_packet(packet) ⇒ void

This method returns an undefined value.

Encodes and writes one packet, then flushes.

Parameters:

Raises:



110
111
112
113
114
# File 'lib/protocol/mqtt/connection.rb', line 110

def write_packet(packet)
  raise ClosedError, "connection closed" if @closed
  @io.write(packet.encode(version: @version))
  @io.flush
end