Module: ZZQ::WebSocket::Transport
- Defined in:
- lib/zzq/websocket/transport.rb
Overview
Shared MQTT-over-WebSocket transport for ‘ws://` and `wss://`. Registered on both schemes at require time. The only difference between the two is whether the parsed endpoint carries an `ssl_context:` — supplied via the `tls_context:` kwarg on `bind` and `connect`, matching the `mqtts://` transport convention.
Defined Under Namespace
Classes: Listener
Constant Summary collapse
- DEFAULT_SUBPROTOCOLS =
Advertised by the client by default — “mqtt” is the modern MQTT v3.1.1 + v5 subprotocol name; “mqttv3.1” is the legacy name still used by some clients. “mqttv3.1.1” is an older convention that occasionally shows up in the wild. “mqtt” is listed first so brokers pick it over the legacy aliases.
%w[mqtt mqttv3.1.1 mqttv3.1].freeze
Class Method Summary collapse
- .bind(endpoint, engine, tls_context: nil, subprotocols: DEFAULT_SUBPROTOCOLS, path: nil) ⇒ Object
- .connect(endpoint, engine, tls_context: nil, subprotocols: %w[mqtt],, headers: nil) ⇒ Object
- .parse_http_endpoint(endpoint, tls_context) ⇒ Object
Class Method Details
.bind(endpoint, engine, tls_context: nil, subprotocols: DEFAULT_SUBPROTOCOLS, path: nil) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/zzq/websocket/transport.rb', line 32 def bind(endpoint, engine, tls_context: nil, subprotocols: DEFAULT_SUBPROTOCOLS, path: nil, **) scheme = endpoint[/\A([a-z]+):\/\//, 1] raise Error, "wss:// bind requires tls_context:" if scheme == "wss" && tls_context.nil? http_endpoint = parse_http_endpoint(endpoint, tls_context) bound = http_endpoint.bound port = bound.sockets.first.to_io.local_address.ip_port host = http_endpoint.hostname host_part = host.include?(":") ? "[#{host}]" : host url_path = http_endpoint.url.path match_path = path || (url_path.empty? ? nil : url_path) shown = "#{scheme}://#{host_part}:#{port}#{match_path}" Listener.new( shown_endpoint: shown, bound: bound, http_endpoint: http_endpoint, subprotocols: subprotocols, match_path: match_path, ) end |
.connect(endpoint, engine, tls_context: nil, subprotocols: %w[mqtt],, headers: nil) ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/zzq/websocket/transport.rb', line 55 def connect(endpoint, engine, tls_context: nil, subprotocols: %w[mqtt], headers: nil, **) http_endpoint = parse_http_endpoint(endpoint, tls_context) client_conn = Async::WebSocket::Client.connect( http_endpoint, protocols: subprotocols, headers: headers, ) stream = Stream.wrap(client_conn) engine.handle_connected(stream, endpoint: endpoint) end |
.parse_http_endpoint(endpoint, tls_context) ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/zzq/websocket/transport.rb', line 67 def parse_http_endpoint(endpoint, tls_context) if tls_context Async::HTTP::Endpoint.parse(endpoint, ssl_context: tls_context) else Async::HTTP::Endpoint.parse(endpoint) end end |