Module: NNQ::Transport::Inproc
- Defined in:
- lib/nnq/transport/inproc.rb
Overview
In-process transport. Both peers live in the same process and exchange frames over a Unix socketpair — no network, no address.
Unlike omq’s DirectPipe, inproc here still runs through Protocol::SP: the socketpair just replaces TCP. Kernel buffering across the pair is plenty to avoid contention for typical in-process message sizes, and reusing the SP handshake + framing keeps the transport ~40 LOC instead of a parallel Connection implementation.
Defined Under Namespace
Classes: Listener
Class Method Summary collapse
-
.bind(endpoint, engine) ⇒ Listener
Binds
enginetoendpointin the process-global registry. -
.connect(endpoint, engine) ⇒ void
Connects
engineto a bound inproc endpoint. -
.reset! ⇒ Object
Clears the registry.
-
.unbind(endpoint) ⇒ Object
Removes
endpointfrom the registry.
Class Method Details
.bind(endpoint, engine) ⇒ Listener
Binds engine to endpoint in the process-global registry.
29 30 31 32 33 34 35 36 |
# File 'lib/nnq/transport/inproc.rb', line 29 def bind(endpoint, engine) @mutex.synchronize do raise Error, "inproc endpoint already bound: #{endpoint}" if @registry.key?(endpoint) @registry[endpoint] = engine end Listener.new(endpoint) end |
.connect(endpoint, engine) ⇒ void
This method returns an undefined value.
Connects engine to a bound inproc endpoint. Creates a Unix socketpair, hands one side to the bound engine (accepted), the other to the connecting engine (connected). Both sides run the normal SP handshake concurrently.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/nnq/transport/inproc.rb', line 47 def connect(endpoint, engine) bound = @mutex.synchronize { @registry[endpoint] } raise Error, "inproc endpoint not bound: #{endpoint}" unless bound a, b = UNIXSocket.pair # Handshake on the bound side must run concurrently with # ours — if we called bound.handle_accepted synchronously # it would block on reading our greeting before we've had # a chance to write it. bound.spawn_task(annotation: "nnq inproc accept #{endpoint}") do bound.handle_accepted(IO::Stream::Buffered.wrap(b), endpoint: endpoint) end engine.handle_connected(IO::Stream::Buffered.wrap(a), endpoint: endpoint) end |
.reset! ⇒ Object
Clears the registry. For tests.
71 72 73 |
# File 'lib/nnq/transport/inproc.rb', line 71 def reset! @mutex.synchronize { @registry.clear } end |
.unbind(endpoint) ⇒ Object
Removes endpoint from the registry. Called by Listener#stop.
65 66 67 |
# File 'lib/nnq/transport/inproc.rb', line 65 def unbind(endpoint) @mutex.synchronize { @registry.delete(endpoint) } end |