Module: ZZQ::Transport::MQTT

Defined in:
lib/zzq/transport/mqtt.rb

Overview

Plain MQTT-over-TCP transport. Endpoint scheme: ‘mqtt://host:port` (default port 1883). One server per bind, blocking accept inside an Async fiber.

Defined Under Namespace

Classes: Listener

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine) ⇒ Object



18
19
20
21
22
23
24
25
# File 'lib/zzq/transport/mqtt.rb', line 18

def bind(endpoint, engine, **)
  host, port = parse_endpoint(endpoint)
  host       = "0.0.0.0" if host == "*"
  server     = TCPServer.new(host, port)
  actual     = server.local_address.ip_port
  host_part  = host.include?(":") ? "[#{host}]" : host
  Listener.new("mqtt://#{host_part}:#{actual}", server, actual, engine)
end

.connect(endpoint, engine) ⇒ Object



28
29
30
31
32
# File 'lib/zzq/transport/mqtt.rb', line 28

def connect(endpoint, engine, **)
  host, port = parse_endpoint(endpoint)
  sock = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.options))
  engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint)
end

.connect_timeout(options) ⇒ Object



35
36
37
38
39
# File 'lib/zzq/transport/mqtt.rb', line 35

def connect_timeout(options)
  ri = options.reconnect_interval
  ri = ri.end if ri.is_a?(Range)
  [ri, 0.5].max
end

.parse_endpoint(endpoint) ⇒ Object



42
43
44
45
# File 'lib/zzq/transport/mqtt.rb', line 42

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint.sub("mqtt://", "tcp://"))
  [uri.hostname, uri.port]
end