Class: NNQ::Transport::Inproc::Pipe
- Inherits:
-
Object
- Object
- NNQ::Transport::Inproc::Pipe
- Defined in:
- lib/nnq/transport/inproc/pipe.rb
Overview
Queue-based in-process pipe. Duck-types Connection so routing strategies, the recv loop, and the send pump work against it unchanged.
No wire framing: bodies are transferred as frozen Strings through a pair of Async::Queue (one per direction). When an SP backtrace header is supplied (REQ/REP/SURVEYOR paths), it’s prepended before enqueue so #receive_message returns an already-prefixed body — matching the TCP/IPC framing semantic so routing’s ‘parse_backtrace` parses the same layout either way.
Direct-recv fast path: when a routing strategy calls #wire_direct_recv on the peer side of a pipe pair, subsequent #send_message calls enqueue straight into the consumer’s recv queue — the intermediate pipe queue and the recv pump fiber are both skipped. Cuts three fiber hops to one and is what lets inproc PUSH/PULL clear 1M msg/s on YJIT.
Wiring happens synchronously inside connect (before the call returns to the caller), so there’s no window in which a send can precede a wire — no pending buffer needed.
Close protocol: #close enqueues a ‘nil` sentinel onto the send side (or the direct queue if wired). The peer’s recv loop sees ‘nil`, raises `EOFError`, and unwinds via its connection supervisor.
Instance Attribute Summary collapse
-
#direct_recv_queue ⇒ Async::Queue?
readonly
When non-nil, #send_message enqueues here instead of into @send_queue.
-
#endpoint ⇒ String?
readonly
Endpoint URI this pipe was established on.
-
#peer ⇒ Pipe?
The other end of the pair.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#flush ⇒ Object
No-op — Async::Queue has no IO buffer to flush.
-
#initialize(send_queue:, recv_queue:, endpoint:) ⇒ Pipe
constructor
A new instance of Pipe.
- #receive_message ⇒ Object
- #send_message(body, header: nil) ⇒ Object (also: #write_message)
-
#wire_direct_recv(queue, transform) ⇒ Object
Wires the direct-recv fast path.
- #write_messages(bodies) ⇒ Object
Constructor Details
#initialize(send_queue:, recv_queue:, endpoint:) ⇒ Pipe
Returns a new instance of Pipe.
45 46 47 48 49 50 51 52 53 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 45 def initialize(send_queue:, recv_queue:, endpoint:) @send_queue = send_queue @recv_queue = recv_queue @endpoint = endpoint @closed = false @peer = nil @direct_recv_queue = nil @direct_recv_transform = nil end |
Instance Attribute Details
#direct_recv_queue ⇒ Async::Queue? (readonly)
Returns when non-nil, #send_message enqueues here instead of into @send_queue.
42 43 44 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 42 def direct_recv_queue @direct_recv_queue end |
#endpoint ⇒ String? (readonly)
Returns endpoint URI this pipe was established on.
35 36 37 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 35 def endpoint @endpoint end |
#peer ⇒ Pipe?
Returns the other end of the pair.
38 39 40 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 38 def peer @peer end |
Instance Method Details
#close ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 119 def close return if @closed @closed = true # Close sentinel goes on whichever queue the peer is reading. # When direct-wired, @send_queue is unused; hit the direct # queue so the consumer unblocks. (@direct_recv_queue || @send_queue).enqueue(nil) end |
#closed? ⇒ Boolean
114 115 116 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 114 def closed? @closed end |
#flush ⇒ Object
No-op — Async::Queue has no IO buffer to flush.
102 103 104 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 102 def flush nil end |
#receive_message ⇒ Object
107 108 109 110 111 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 107 def item = @recv_queue.dequeue raise EOFError, "connection closed" if item.nil? item end |
#send_message(body, header: nil) ⇒ Object Also known as: write_message
70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 70 def (body, header: nil) raise ClosedError, "connection closed" if @closed wire = header ? header + body : body if (q = @direct_recv_queue) item = @direct_recv_transform ? @direct_recv_transform.call(wire) : wire q.enqueue(item) unless item.nil? else @send_queue.enqueue(wire) end end |
#wire_direct_recv(queue, transform) ⇒ Object
Wires the direct-recv fast path. After this call, messages sent on this pipe bypass the intermediate pipe queue and land directly in queue.
64 65 66 67 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 64 def wire_direct_recv(queue, transform) @direct_recv_transform = transform @direct_recv_queue = queue end |
#write_messages(bodies) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/nnq/transport/inproc/pipe.rb', line 86 def (bodies) raise ClosedError, "connection closed" if @closed if (q = @direct_recv_queue) transform = @direct_recv_transform bodies.each do |body| item = transform ? transform.call(body) : body q.enqueue(item) unless item.nil? end else bodies.each { |body| @send_queue.enqueue(body) } end end |