Module: ZZQ::WebSocket::Transport

Defined in:
lib/zzq/websocket/transport.rb

Overview

Shared MQTT-over-WebSocket transport for ‘ws://` and `wss://`. Registered on both schemes at require time. The only difference between the two is whether the parsed endpoint carries an `ssl_context:` — supplied via the `tls_context:` kwarg on `bind` and `connect`, matching the `mqtts://` transport convention.

Defined Under Namespace

Classes: Listener

Constant Summary collapse

DEFAULT_SUBPROTOCOLS =

Advertised by the client by default — “mqtt” is the modern MQTT v3.1.1 + v5 subprotocol name; “mqttv3.1” is the legacy name still used by some clients. “mqttv3.1.1” is an older convention that occasionally shows up in the wild. “mqtt” is listed first so brokers pick it over the legacy aliases.

%w[mqtt mqttv3.1.1 mqttv3.1].freeze

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine, tls_context: nil, subprotocols: DEFAULT_SUBPROTOCOLS, path: nil) ⇒ Object

Raises:

  • (Error)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/zzq/websocket/transport.rb', line 32

def bind(endpoint, engine, tls_context: nil, subprotocols: DEFAULT_SUBPROTOCOLS, path: nil, **)
  scheme = endpoint[/\A([a-z]+):\/\//, 1]
  raise Error, "wss:// bind requires tls_context:" if scheme == "wss" && tls_context.nil?

  http_endpoint = parse_http_endpoint(endpoint, tls_context)
  bound         = http_endpoint.bound
  port          = bound.sockets.first.to_io.local_address.ip_port
  host          = http_endpoint.hostname
  host_part     = host.include?(":") ? "[#{host}]" : host
  url_path      = http_endpoint.url.path
  match_path    = path || (url_path.empty? ? nil : url_path)
  shown         = "#{scheme}://#{host_part}:#{port}#{match_path}"

  Listener.new(
    shown_endpoint: shown,
    bound:          bound,
    http_endpoint:  http_endpoint,
    subprotocols:   subprotocols,
    match_path:     match_path,
  )
end

.connect(endpoint, engine, tls_context: nil, subprotocols: %w[mqtt],, headers: nil) ⇒ Object



55
56
57
58
59
60
61
62
63
64
# File 'lib/zzq/websocket/transport.rb', line 55

def connect(endpoint, engine, tls_context: nil, subprotocols: %w[mqtt], headers: nil, **)
  http_endpoint = parse_http_endpoint(endpoint, tls_context)
  client_conn   = Async::WebSocket::Client.connect(
    http_endpoint,
    protocols: subprotocols,
    headers:   headers,
  )
  stream = Stream.wrap(client_conn)
  engine.handle_connected(stream, endpoint: endpoint)
end

.parse_http_endpoint(endpoint, tls_context) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/zzq/websocket/transport.rb', line 67

def parse_http_endpoint(endpoint, tls_context)
  if tls_context
    Async::HTTP::Endpoint.parse(endpoint, ssl_context: tls_context)
  else
    Async::HTTP::Endpoint.parse(endpoint)
  end
end