Module: NNQ::Transport::Inproc
- Defined in:
- lib/nnq/transport/inproc.rb,
lib/nnq/transport/inproc/pipe.rb
Overview
In-process transport. Both peers live in the same process and exchange frozen Strings through a pair of Async::Queues — no wire framing, no socketpair, no SP handshake.
The historical implementation ran through a Unix ‘socketpair(2)` and the full SP protocol, making inproc roughly as expensive as IPC. Swapping to Pipe (duck-types Connection) drops the kernel buffer copy, the framing encode/decode, and the handshake — inproc becomes a pure in-process queue transfer.
Defined Under Namespace
Class Method Summary collapse
-
.bind(endpoint, engine) ⇒ Listener
Binds
enginetoendpointin the process-global registry. -
.connect(endpoint, engine) ⇒ void
Connects
engineto a bound inproc endpoint. -
.reset! ⇒ Object
Clears the registry.
-
.unbind(endpoint) ⇒ Object
Removes
endpointfrom the registry.
Class Method Details
.bind(endpoint, engine) ⇒ Listener
Binds engine to endpoint in the process-global registry.
32 33 34 35 36 37 38 39 |
# File 'lib/nnq/transport/inproc.rb', line 32 def bind(endpoint, engine, **) @mutex.synchronize do raise Error, "inproc endpoint already bound: #{endpoint}" if @registry.key?(endpoint) @registry[endpoint] = engine end Listener.new(endpoint) end |
.connect(endpoint, engine) ⇒ void
This method returns an undefined value.
Connects engine to a bound inproc endpoint. Creates a Pipe pair — one queue per direction — and registers each side with its owning engine via Engine#connection_ready. No handshake runs; both ends are live as soon as the pipes are wired.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/nnq/transport/inproc.rb', line 50 def connect(endpoint, engine, **) bound = @mutex.synchronize { @registry[endpoint] } raise Error, "inproc endpoint not bound: #{endpoint}" unless bound a_to_b = Async::Queue.new b_to_a = Async::Queue.new client = Pipe.new(send_queue: a_to_b, recv_queue: b_to_a, endpoint: endpoint) server = Pipe.new(send_queue: b_to_a, recv_queue: a_to_b, endpoint: endpoint) client.peer = server server.peer = client bound.connection_ready(server, endpoint: endpoint) engine.connection_ready(client, endpoint: endpoint) end |
.reset! ⇒ Object
Clears the registry. For tests.
73 74 75 |
# File 'lib/nnq/transport/inproc.rb', line 73 def reset! @mutex.synchronize { @registry.clear } end |
.unbind(endpoint) ⇒ Object
Removes endpoint from the registry. Called by Listener#stop.
67 68 69 |
# File 'lib/nnq/transport/inproc.rb', line 67 def unbind(endpoint) @mutex.synchronize { @registry.delete(endpoint) } end |