Module: OMQ::Transport::UDP

Defined in:
lib/omq/transport/udp.rb

Overview

UDP transport for RADIO/DISH sockets.

Connectionless, datagram-based. No ZMTP handshake. DISH binds and receives; RADIO connects and sends.

Wire format per datagram:

flags (1 byte = 0x01) | group_size (1 byte) | group (n bytes) | body

Defined Under Namespace

Classes: DishConnection, Listener, RadioConnection

Constant Summary collapse

MAX_DATAGRAM_SIZE =
65507

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine) ⇒ Listener

Binds a UDP server for a DISH socket.

Parameters:

  • endpoint (String)

    e.g. “udp://*:5555” or “udp://127.0.0.1:5555”

  • engine (Engine)

Returns:



27
28
29
30
31
32
33
# File 'lib/omq/transport/udp.rb', line 27

def bind(endpoint, engine)
  host, port = parse_endpoint(endpoint)
  host = "0.0.0.0" if host == "*"
  socket = UDPSocket.new
  socket.bind(host, port)
  Listener.new(endpoint, socket, engine)
end

.connect(endpoint, engine) ⇒ void

This method returns an undefined value.

Connects a UDP sender for a RADIO socket.

Parameters:

  • endpoint (String)

    e.g. “udp://127.0.0.1:5555”

  • engine (Engine)


42
43
44
45
46
47
# File 'lib/omq/transport/udp.rb', line 42

def connect(endpoint, engine)
  host, port = parse_endpoint(endpoint)
  socket = UDPSocket.new
  conn = RadioConnection.new(socket, host, port)
  engine.connection_ready(conn, endpoint: endpoint)
end

.decode_datagram(data) ⇒ Array(String, String)?

Decodes a UDP datagram into [group, body].

Parameters:

  • data (String)

    raw datagram bytes

Returns:

  • (Array(String, String), nil)

    nil if malformed



77
78
79
80
81
82
83
84
85
# File 'lib/omq/transport/udp.rb', line 77

def decode_datagram(data)
  return nil if data.bytesize < 2
  return nil unless data.getbyte(0) & 0x01 == 0x01
  gs = data.getbyte(1)
  return nil if data.bytesize < 2 + gs
  group = data.byteslice(2, gs)
  body  = data.byteslice(2 + gs, data.bytesize - 2 - gs)
  [group, body]
end

.encode_datagram(group, body) ⇒ String

Encodes a group + body into a UDP datagram.

Parameters:

  • group (String)
  • body (String)

Returns:

  • (String)

    binary datagram



65
66
67
68
69
# File 'lib/omq/transport/udp.rb', line 65

def encode_datagram(group, body)
  g = group.b
  b = body.b
  [0x01, g.bytesize].pack("CC") + g + b
end

.parse_endpoint(endpoint) ⇒ Array(String, Integer)

Parameters:

  • endpoint (String)

Returns:

  • (Array(String, Integer))


53
54
55
56
# File 'lib/omq/transport/udp.rb', line 53

def parse_endpoint(endpoint)
  uri = URI.parse(endpoint)
  [uri.hostname, uri.port]
end