Module: ZZQ::Transport::MQTT
- Defined in:
- lib/zzq/transport/mqtt.rb
Overview
Plain MQTT-over-TCP transport. Endpoint scheme: ‘mqtt://host:port` (default port 1883). One server per bind, blocking accept inside an Async fiber.
Defined Under Namespace
Classes: Listener
Class Method Summary collapse
- .bind(endpoint, engine) ⇒ Object
- .connect(endpoint, engine) ⇒ Object
- .connect_timeout(options) ⇒ Object
- .parse_endpoint(endpoint) ⇒ Object
Class Method Details
.bind(endpoint, engine) ⇒ Object
18 19 20 21 22 23 24 25 |
# File 'lib/zzq/transport/mqtt.rb', line 18 def bind(endpoint, engine, **) host, port = parse_endpoint(endpoint) host = "0.0.0.0" if host == "*" server = TCPServer.new(host, port) actual = server.local_address.ip_port host_part = host.include?(":") ? "[#{host}]" : host Listener.new("mqtt://#{host_part}:#{actual}", server, actual, engine) end |
.connect(endpoint, engine) ⇒ Object
28 29 30 31 32 |
# File 'lib/zzq/transport/mqtt.rb', line 28 def connect(endpoint, engine, **) host, port = parse_endpoint(endpoint) sock = ::Socket.tcp(host, port, connect_timeout: connect_timeout(engine.)) engine.handle_connected(IO::Stream::Buffered.wrap(sock), endpoint: endpoint) end |
.connect_timeout(options) ⇒ Object
35 36 37 38 39 |
# File 'lib/zzq/transport/mqtt.rb', line 35 def connect_timeout() ri = .reconnect_interval ri = ri.end if ri.is_a?(Range) [ri, 0.5].max end |
.parse_endpoint(endpoint) ⇒ Object
42 43 44 45 |
# File 'lib/zzq/transport/mqtt.rb', line 42 def parse_endpoint(endpoint) uri = URI.parse(endpoint.sub("mqtt://", "tcp://")) [uri.hostname, uri.port] end |