Class: NNQ::Routing::Pull

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/pull.rb

Overview

PULL side: an unbounded queue of received messages. Per-connection recv fibers (spawned by the Engine when each pipe is established) call #enqueue on each message; user code calls #receive.

No HWM, no prefetch buffer — TCP throttles the senders directly via the kernel buffer.

Instance Method Summary collapse

Constructor Details

#initializePull

Returns a new instance of Pull.



15
16
17
# File 'lib/nnq/routing/pull.rb', line 15

def initialize
  @queue = Async::Queue.new
end

Instance Method Details

#closeObject

Wakes any waiters with nil so receive returns from a closed socket.



33
34
35
# File 'lib/nnq/routing/pull.rb', line 33

def close
  @queue.enqueue(nil)
end

#close_readObject

Wakes any waiters with nil, leaving the send side untouched (PULL has no send side — close_read is identical to close here, but kept separate for the ‘Socket#close_read` contract).



41
42
43
# File 'lib/nnq/routing/pull.rb', line 41

def close_read
  @queue.enqueue(nil)
end

#enqueue(body, _conn = nil) ⇒ Object



20
21
22
# File 'lib/nnq/routing/pull.rb', line 20

def enqueue(body, _conn = nil)
  @queue.enqueue(body)
end

#receiveString?

Returns message body, or nil if the queue was closed.

Returns:

  • (String, nil)

    message body, or nil if the queue was closed



26
27
28
# File 'lib/nnq/routing/pull.rb', line 26

def receive
  @queue.dequeue
end