Class: NNQ::Routing::Pair
- Inherits:
-
Object
- Object
- NNQ::Routing::Pair
- Includes:
- SendPump
- Defined in:
- lib/nnq/routing/pair.rb
Overview
PAIR0: exclusive bidirectional channel with a single peer.
Wire format: no SP header. Body on the wire is exactly the user payload (same as push0/pull0). Per nng’s pair0, when a second peer tries to connect while one is already paired, the new pipe is rejected — first peer wins.
Send side: shared send queue + 1 pump (reuses SendPump). The pump infrastructure is identical to PUSH; PAIR just never has more than one pump because it never has more than one peer.
Recv side: messages fed by the engine’s recv loop into a local Async::Queue. Unbounded — TCP throttles the peer.
Constant Summary
Constants included from SendPump
SendPump::BATCH_BYTE_CAP, SendPump::BATCH_MSG_CAP
Instance Method Summary collapse
- #close ⇒ Object
-
#close_read ⇒ Object
Wake recv side without tearing down the send pump.
-
#connection_added(conn) ⇒ Object
First-pipe-wins.
- #connection_removed(conn) ⇒ Object
-
#enqueue(body, _conn = nil) ⇒ Object
Called by the recv loop with each message off the wire.
-
#initialize(engine) ⇒ Pair
constructor
A new instance of Pair.
-
#receive ⇒ String?
Message body, or nil once the socket is closed.
- #send(body) ⇒ Object
Methods included from SendPump
#remove_send_pump_for, #send_queue_drained?
Constructor Details
#initialize(engine) ⇒ Pair
Returns a new instance of Pair.
26 27 28 29 30 |
# File 'lib/nnq/routing/pair.rb', line 26 def initialize(engine) init_send_pump(engine) @recv_queue = Async::Queue.new @peer = nil end |
Instance Method Details
#close ⇒ Object
68 69 70 71 |
# File 'lib/nnq/routing/pair.rb', line 68 def close super @recv_queue.enqueue(nil) # wake any waiter end |
#close_read ⇒ Object
Wake recv side without tearing down the send pump.
75 76 77 |
# File 'lib/nnq/routing/pair.rb', line 75 def close_read @recv_queue.enqueue(nil) end |
#connection_added(conn) ⇒ Object
First-pipe-wins. Raising ConnectionRejected tells the ConnectionLifecycle to tear down the just-registered connection without ever exposing it to pumps.
54 55 56 57 58 59 |
# File 'lib/nnq/routing/pair.rb', line 54 def connection_added(conn) raise ConnectionRejected, "PAIR socket already has a peer" if @peer @peer = conn spawn_send_pump_for(conn) end |
#connection_removed(conn) ⇒ Object
62 63 64 65 |
# File 'lib/nnq/routing/pair.rb', line 62 def connection_removed(conn) remove_send_pump_for(conn) @peer = nil if @peer == conn end |
#enqueue(body, _conn = nil) ⇒ Object
Called by the recv loop with each message off the wire.
46 47 48 |
# File 'lib/nnq/routing/pair.rb', line 46 def enqueue(body, _conn = nil) @recv_queue.enqueue(body) end |
#receive ⇒ String?
Returns message body, or nil once the socket is closed.
40 41 42 |
# File 'lib/nnq/routing/pair.rb', line 40 def receive @recv_queue.dequeue end |
#send(body) ⇒ Object
34 35 36 |
# File 'lib/nnq/routing/pair.rb', line 34 def send(body) enqueue_for_send(body) end |