Class: NNQ::Routing::Pair

Inherits:
Object
  • Object
show all
Includes:
SendPump
Defined in:
lib/nnq/routing/pair.rb

Overview

PAIR0: exclusive bidirectional channel with a single peer.

Wire format: no SP header. Body on the wire is exactly the user payload (same as push0/pull0). Per nng’s pair0, when a second peer tries to connect while one is already paired, the new pipe is rejected — first peer wins.

Send side: shared send queue + 1 pump (reuses SendPump). The pump infrastructure is identical to PUSH; PAIR just never has more than one pump because it never has more than one peer.

Recv side: messages fed by the engine’s recv loop into a local Async::Queue. Unbounded — TCP throttles the peer.

Constant Summary

Constants included from SendPump

SendPump::BATCH_BYTE_CAP, SendPump::BATCH_MSG_CAP

Instance Method Summary collapse

Methods included from SendPump

#remove_send_pump_for, #send_queue_drained?

Constructor Details

#initialize(engine) ⇒ Pair

Returns a new instance of Pair.



26
27
28
29
30
# File 'lib/nnq/routing/pair.rb', line 26

def initialize(engine)
  init_send_pump(engine)
  @recv_queue = Async::Queue.new
  @peer       = nil
end

Instance Method Details

#closeObject



68
69
70
71
# File 'lib/nnq/routing/pair.rb', line 68

def close
  super
  @recv_queue.enqueue(nil) # wake any waiter
end

#close_readObject

Wake recv side without tearing down the send pump.



75
76
77
# File 'lib/nnq/routing/pair.rb', line 75

def close_read
  @recv_queue.enqueue(nil)
end

#connection_added(conn) ⇒ Object

First-pipe-wins. Raising ConnectionRejected tells the ConnectionLifecycle to tear down the just-registered connection without ever exposing it to pumps.

Raises:



54
55
56
57
58
59
# File 'lib/nnq/routing/pair.rb', line 54

def connection_added(conn)
  raise ConnectionRejected, "PAIR socket already has a peer" if @peer

  @peer = conn
  spawn_send_pump_for(conn)
end

#connection_removed(conn) ⇒ Object



62
63
64
65
# File 'lib/nnq/routing/pair.rb', line 62

def connection_removed(conn)
  remove_send_pump_for(conn)
  @peer = nil if @peer == conn
end

#enqueue(body, _conn = nil) ⇒ Object

Called by the recv loop with each message off the wire.



46
47
48
# File 'lib/nnq/routing/pair.rb', line 46

def enqueue(body, _conn = nil)
  @recv_queue.enqueue(body)
end

#receiveString?

Returns message body, or nil once the socket is closed.

Returns:

  • (String, nil)

    message body, or nil once the socket is closed



40
41
42
# File 'lib/nnq/routing/pair.rb', line 40

def receive
  @recv_queue.dequeue
end

#send(body) ⇒ Object

Parameters:

  • body (String)


34
35
36
# File 'lib/nnq/routing/pair.rb', line 34

def send(body)
  enqueue_for_send(body)
end