Class: NNQ::Routing::Pull

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/pull.rb

Overview

PULL side: an unbounded queue of received messages. Per-connection recv fibers (spawned by the Engine when each pipe is established) call #enqueue on each message; user code calls #receive.

No HWM, no prefetch buffer — TCP throttles the senders directly via the kernel buffer.

Instance Method Summary collapse

Constructor Details

#initializePull

Returns a new instance of Pull.



15
16
17
# File 'lib/nnq/routing/pull.rb', line 15

def initialize
  @queue = Async::Queue.new
end

Instance Method Details

#closeObject

Wakes any waiters with nil so receive returns from a closed socket.



41
42
43
# File 'lib/nnq/routing/pull.rb', line 41

def close
  @queue.enqueue(nil)
end

#close_readObject

Wakes any waiters with nil, leaving the send side untouched (PULL has no send side — close_read is identical to close here, but kept separate for the ‘Socket#close_read` contract).



49
50
51
# File 'lib/nnq/routing/pull.rb', line 49

def close_read
  @queue.enqueue(nil)
end

#direct_recv_for(_conn) ⇒ Object

Inproc fast-path hook: return the routing recv queue so the peer pipe can enqueue directly, skipping the recv pump fiber. Identity transform — PULL bodies are the user payload already.



28
29
30
# File 'lib/nnq/routing/pull.rb', line 28

def direct_recv_for(_conn)
  [@queue, nil]
end

#enqueue(body, _conn = nil) ⇒ Object



20
21
22
# File 'lib/nnq/routing/pull.rb', line 20

def enqueue(body, _conn = nil)
  @queue.enqueue(body)
end

#receiveString?

Returns message body, or nil if the queue was closed.

Returns:

  • (String, nil)

    message body, or nil if the queue was closed



34
35
36
# File 'lib/nnq/routing/pull.rb', line 34

def receive
  @queue.dequeue
end