Module: ZZQ::Transport::MQTTS

Defined in:
lib/zzq/transport/mqtts.rb

Overview

MQTT-over-TLS transport — endpoint scheme ‘mqtts://host:port` (default port 8883).

Both bind and connect accept a ‘tls_context:` option — any OpenSSL::SSL::SSLContext. On bind the context must carry a cert + key; on connect a default client context is used if none is given. The transport does not constrain the context; see the README for a recommended TLS 1.3 + `TLS_CHACHA20_POLY1305_SHA256` configuration.

Per-connection application data is capped at DATA_LIMIT bytes, after which the socket is torn down and the peer must reconnect. This keeps AEAD nonce counters well below their unsafe bound (e.g. ChaCha20-Poly1305’s ~256 GiB per-key integrity bound) without relying on a specific cipher choice.

Defined Under Namespace

Classes: LimitedStream, Listener

Constant Summary collapse

DATA_LIMIT =

128 GiB — conservative for every AEAD in TLS 1.3 (ChaCha20- Poly1305 at ~256 GiB, AES-GCM limits are higher under TLS 1.3 record sizes). Reconnection negotiates fresh keys, so this caps nonce reuse exposure regardless of cipher choice.

128 * 1024 * 1024 * 1024

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine, tls_context: nil) ⇒ Object

Raises:



38
39
40
41
42
43
44
45
46
47
# File 'lib/zzq/transport/mqtts.rb', line 38

def bind(endpoint, engine, tls_context: nil, **)
  raise Error, "mqtts:// bind requires tls_context:" unless tls_context

  host, port = parse_endpoint(endpoint)
  host       = "0.0.0.0" if host == "*"
  server     = TCPServer.new(host, port)
  actual     = server.local_address.ip_port
  host_part  = host.include?(":") ? "[#{host}]" : host
  Listener.new("mqtts://#{host_part}:#{actual}", server, tls_context, engine)
end

.connect(endpoint, engine, tls_context: nil) ⇒ Object



50
51
52
53
54
55
56
57
58
59
# File 'lib/zzq/transport/mqtts.rb', line 50

def connect(endpoint, engine, tls_context: nil, **)
  host, port = parse_endpoint(endpoint)
  ctx = tls_context || OpenSSL::SSL::SSLContext.new
  tcp = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.options))
  ssl = OpenSSL::SSL::SSLSocket.new(tcp, ctx)
  ssl.hostname = host  # SNI
  ssl.sync_close = true
  ssl.connect
  engine.handle_connected(LimitedStream.wrap(ssl), endpoint: endpoint)
end

.connect_timeout(options) ⇒ Object



62
63
64
65
66
# File 'lib/zzq/transport/mqtts.rb', line 62

def connect_timeout(options)
  ri = options.reconnect_interval
  ri = ri.end if ri.is_a?(Range)
  [ri, 0.5].max
end

.parse_endpoint(endpoint) ⇒ Object



69
70
71
72
# File 'lib/zzq/transport/mqtts.rb', line 69

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint.sub("mqtts://", "tcp://"))
  [uri.hostname, uri.port]
end