Module: ZZQ::Transport::MQTTS
- Defined in:
- lib/zzq/transport/mqtts.rb
Overview
MQTT-over-TLS transport — endpoint scheme ‘mqtts://host:port` (default port 8883).
Both bind and connect accept a ‘tls_context:` option — any OpenSSL::SSL::SSLContext. On bind the context must carry a cert + key; on connect a default client context is used if none is given. The transport does not constrain the context; see the README for a recommended TLS 1.3 + `TLS_CHACHA20_POLY1305_SHA256` configuration.
Per-connection application data is capped at DATA_LIMIT bytes, after which the socket is torn down and the peer must reconnect. This keeps AEAD nonce counters well below their unsafe bound (e.g. ChaCha20-Poly1305’s ~256 GiB per-key integrity bound) without relying on a specific cipher choice.
Defined Under Namespace
Classes: LimitedStream, Listener
Constant Summary collapse
- DATA_LIMIT =
128 GiB — conservative for every AEAD in TLS 1.3 (ChaCha20- Poly1305 at ~256 GiB, AES-GCM limits are higher under TLS 1.3 record sizes). Reconnection negotiates fresh keys, so this caps nonce reuse exposure regardless of cipher choice.
128 * 1024 * 1024 * 1024
Class Method Summary collapse
- .bind(endpoint, engine, tls_context: nil) ⇒ Object
- .connect(endpoint, engine, tls_context: nil) ⇒ Object
- .connect_timeout(options) ⇒ Object
- .parse_endpoint(endpoint) ⇒ Object
Class Method Details
.bind(endpoint, engine, tls_context: nil) ⇒ Object
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/zzq/transport/mqtts.rb', line 38 def bind(endpoint, engine, tls_context: nil, **) raise Error, "mqtts:// bind requires tls_context:" unless tls_context host, port = parse_endpoint(endpoint) host = "0.0.0.0" if host == "*" server = TCPServer.new(host, port) actual = server.local_address.ip_port host_part = host.include?(":") ? "[#{host}]" : host Listener.new("mqtts://#{host_part}:#{actual}", server, tls_context, engine) end |
.connect(endpoint, engine, tls_context: nil) ⇒ Object
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/zzq/transport/mqtts.rb', line 50 def connect(endpoint, engine, tls_context: nil, **) host, port = parse_endpoint(endpoint) ctx = tls_context || OpenSSL::SSL::SSLContext.new tcp = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.)) ssl = OpenSSL::SSL::SSLSocket.new(tcp, ctx) ssl.hostname = host # SNI ssl.sync_close = true ssl.connect engine.handle_connected(LimitedStream.wrap(ssl), endpoint: endpoint) end |
.connect_timeout(options) ⇒ Object
62 63 64 65 66 |
# File 'lib/zzq/transport/mqtts.rb', line 62 def connect_timeout() ri = .reconnect_interval ri = ri.end if ri.is_a?(Range) [ri, 0.5].max end |
.parse_endpoint(endpoint) ⇒ Object
69 70 71 72 |
# File 'lib/zzq/transport/mqtts.rb', line 69 def parse_endpoint(endpoint) uri = URI.parse(endpoint.sub("mqtts://", "tcp://")) [uri.hostname, uri.port] end |