Class: OMQ::Transport::UDP::DishConnection
- Inherits:
-
Object
- Object
- OMQ::Transport::UDP::DishConnection
- Defined in:
- lib/omq/transport/udp.rb
Overview
Incoming UDP connection for DISH sockets.
Tracks joined groups locally. JOIN/LEAVE commands from the DISH routing strategy are intercepted via #send_command and never transmitted on the wire.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the underlying UDP socket.
-
#curve? ⇒ Boolean
Whether this connection uses CURVE encryption.
-
#initialize(socket) ⇒ DishConnection
constructor
A new instance of DishConnection.
-
#receive_message ⇒ Array<String>
Receives one matching datagram, blocking until available.
-
#send_command(cmd) ⇒ Object
Handles JOIN/LEAVE commands locally; nothing is sent on the wire.
Constructor Details
#initialize(socket) ⇒ DishConnection
Returns a new instance of DishConnection.
149 150 151 152 |
# File 'lib/omq/transport/udp.rb', line 149 def initialize(socket) @socket = socket @groups = Set.new end |
Instance Method Details
#close ⇒ Object
Closes the underlying UDP socket.
200 201 202 |
# File 'lib/omq/transport/udp.rb', line 200 def close @socket.close rescue nil end |
#curve? ⇒ Boolean
Whether this connection uses CURVE encryption.
194 195 196 |
# File 'lib/omq/transport/udp.rb', line 194 def curve? false end |
#receive_message ⇒ Array<String>
Receives one matching datagram, blocking until available.
Async-safe: rescues IO::WaitReadable and yields to the fiber scheduler via #wait_readable.
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/omq/transport/udp.rb', line 162 def loop do data, = @socket.recvfrom_nonblock(MAX_DATAGRAM_SIZE) parts = UDP.decode_datagram(data) next unless parts group, body = parts next unless @groups.include?(group.b) return [group.b.freeze, body.b.freeze] rescue IO::WaitReadable @socket.wait_readable retry end end |
#send_command(cmd) ⇒ Object
Handles JOIN/LEAVE commands locally; nothing is sent on the wire.
181 182 183 184 185 186 187 188 |
# File 'lib/omq/transport/udp.rb', line 181 def send_command(cmd) case cmd.name when "JOIN" @groups << cmd.data.b.freeze when "LEAVE" @groups.delete(cmd.data.b) end end |