Class: OMQ::Transport::UDP::DishConnection
- Inherits:
-
Object
- Object
- OMQ::Transport::UDP::DishConnection
- Defined in:
- lib/omq/transport/udp.rb
Overview
Incoming UDP connection for DISH sockets.
Tracks joined groups locally. JOIN/LEAVE commands from the DISH routing strategy are intercepted via #send_command and never transmitted on the wire.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the underlying UDP socket.
-
#curve? ⇒ Boolean
Whether this connection uses CURVE encryption.
-
#initialize(socket) ⇒ DishConnection
constructor
A new instance of DishConnection.
-
#receive_message ⇒ Array<String>
Receives one matching datagram, blocking until available.
-
#send_command(cmd) ⇒ Object
Handles JOIN/LEAVE commands locally; nothing is sent on the wire.
Constructor Details
#initialize(socket) ⇒ DishConnection
Returns a new instance of DishConnection.
183 184 185 186 |
# File 'lib/omq/transport/udp.rb', line 183 def initialize(socket) @socket = socket @groups = Set.new end |
Instance Method Details
#close ⇒ Object
Closes the underlying UDP socket.
234 235 236 |
# File 'lib/omq/transport/udp.rb', line 234 def close @socket.close rescue nil end |
#curve? ⇒ Boolean
Whether this connection uses CURVE encryption.
228 229 230 |
# File 'lib/omq/transport/udp.rb', line 228 def curve? false end |
#receive_message ⇒ Array<String>
Receives one matching datagram, blocking until available.
Async-safe: rescues IO::WaitReadable and yields to the fiber scheduler via #wait_readable.
196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/omq/transport/udp.rb', line 196 def loop do data, = @socket.recvfrom_nonblock(MAX_DATAGRAM_SIZE) parts = UDP.decode_datagram(data) next unless parts group, body = parts next unless @groups.include?(group.b) return [group, body] rescue IO::WaitReadable @socket.wait_readable retry end end |
#send_command(cmd) ⇒ Object
Handles JOIN/LEAVE commands locally; nothing is sent on the wire.
215 216 217 218 219 220 221 222 |
# File 'lib/omq/transport/udp.rb', line 215 def send_command(cmd) case cmd.name when "JOIN" @groups << cmd.data.b.freeze when "LEAVE" @groups.delete(cmd.data.b) end end |