Module: OMQ::Transport::UDP
- Defined in:
- lib/omq/transport/udp.rb
Overview
UDP transport for RADIO/DISH sockets.
Connectionless, datagram-based. No ZMTP handshake. DISH binds and receives; RADIO connects and sends.
Wire format per datagram:
flags (1 byte = 0x01) | group_size (1 byte) | group (n bytes) | body
Defined Under Namespace
Classes: DishConnection, Listener, RadioConnection
Constant Summary collapse
- MAX_DATAGRAM_SIZE =
65507
Class Method Summary collapse
-
.bind(endpoint, engine) ⇒ Listener
Binds a UDP server for a DISH socket.
-
.connect(endpoint, engine) ⇒ void
Connects a UDP sender for a RADIO socket.
-
.decode_datagram(data) ⇒ Array(String, String)?
Decodes a UDP datagram into [group, body].
-
.encode_datagram(group, body) ⇒ String
Encodes a group + body into a UDP datagram.
- .parse_endpoint(endpoint) ⇒ Array(String, Integer)
Class Method Details
.bind(endpoint, engine) ⇒ Listener
Binds a UDP server for a DISH socket.
27 28 29 30 31 32 33 |
# File 'lib/omq/transport/udp.rb', line 27 def bind(endpoint, engine) host, port = parse_endpoint(endpoint) host = "0.0.0.0" if host == "*" socket = UDPSocket.new socket.bind(host, port) Listener.new(endpoint, socket, engine) end |
.connect(endpoint, engine) ⇒ void
This method returns an undefined value.
Connects a UDP sender for a RADIO socket.
42 43 44 45 46 47 |
# File 'lib/omq/transport/udp.rb', line 42 def connect(endpoint, engine) host, port = parse_endpoint(endpoint) socket = UDPSocket.new conn = RadioConnection.new(socket, host, port) engine.connection_ready(conn, endpoint: endpoint) end |
.decode_datagram(data) ⇒ Array(String, String)?
Decodes a UDP datagram into [group, body].
77 78 79 80 81 82 83 84 85 |
# File 'lib/omq/transport/udp.rb', line 77 def decode_datagram(data) return nil if data.bytesize < 2 return nil unless data.getbyte(0) & 0x01 == 0x01 gs = data.getbyte(1) return nil if data.bytesize < 2 + gs group = data.byteslice(2, gs) body = data.byteslice(2 + gs, data.bytesize - 2 - gs) [group, body] end |
.encode_datagram(group, body) ⇒ String
Encodes a group + body into a UDP datagram.
65 66 67 68 69 |
# File 'lib/omq/transport/udp.rb', line 65 def encode_datagram(group, body) g = group.b b = body.b [0x01, g.bytesize].pack("CC") + g + b end |
.parse_endpoint(endpoint) ⇒ Array(String, Integer)
53 54 55 56 |
# File 'lib/omq/transport/udp.rb', line 53 def parse_endpoint(endpoint) uri = URI.parse(endpoint) [uri.hostname, uri.port] end |