Module: NNQ::Transport::Inproc

Defined in:
lib/nnq/transport/inproc.rb,
lib/nnq/transport/inproc/pipe.rb

Overview

In-process transport. Both peers live in the same process and exchange frozen Strings through a pair of Async::Queues — no wire framing, no socketpair, no SP handshake.

The historical implementation ran through a Unix ‘socketpair(2)` and the full SP protocol, making inproc roughly as expensive as IPC. Swapping to Pipe (duck-types Connection) drops the kernel buffer copy, the framing encode/decode, and the handshake — inproc becomes a pure in-process queue transfer.

Defined Under Namespace

Classes: Listener, Pipe

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine) ⇒ Listener

Binds engine to endpoint in the process-global registry.

Parameters:

  • endpoint (String)

    e.g. “inproc://my-endpoint”

  • engine (Engine)

Returns:



32
33
34
35
36
37
38
39
# File 'lib/nnq/transport/inproc.rb', line 32

def bind(endpoint, engine, **)
  @mutex.synchronize do
    raise Error, "inproc endpoint already bound: #{endpoint}" if @registry.key?(endpoint)
    @registry[endpoint] = engine
  end

  Listener.new(endpoint)
end

.connect(endpoint, engine) ⇒ void

This method returns an undefined value.

Connects engine to a bound inproc endpoint. Creates a Pipe pair — one queue per direction — and registers each side with its owning engine via Engine#connection_ready. No handshake runs; both ends are live as soon as the pipes are wired.

Parameters:

  • endpoint (String)
  • engine (Engine)

Raises:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/nnq/transport/inproc.rb', line 50

def connect(endpoint, engine, **)
  bound = @mutex.synchronize { @registry[endpoint] }
  raise Error, "inproc endpoint not bound: #{endpoint}" unless bound

  a_to_b = Async::Queue.new
  b_to_a = Async::Queue.new
  client = Pipe.new(send_queue: a_to_b, recv_queue: b_to_a, endpoint: endpoint)
  server = Pipe.new(send_queue: b_to_a, recv_queue: a_to_b, endpoint: endpoint)
  client.peer = server
  server.peer = client

  bound.connection_ready(server, endpoint: endpoint)
  engine.connection_ready(client, endpoint: endpoint)
end

.reset!Object

Clears the registry. For tests.



73
74
75
# File 'lib/nnq/transport/inproc.rb', line 73

def reset!
  @mutex.synchronize { @registry.clear }
end

.unbind(endpoint) ⇒ Object

Removes endpoint from the registry. Called by Listener#stop.



67
68
69
# File 'lib/nnq/transport/inproc.rb', line 67

def unbind(endpoint)
  @mutex.synchronize { @registry.delete(endpoint) }
end