Module: NNQ::Transport::Inproc

Defined in:
lib/nnq/transport/inproc.rb

Overview

In-process transport. Both peers live in the same process and exchange frames over a Unix socketpair — no network, no address.

Unlike omq’s DirectPipe, inproc here still runs through Protocol::SP: the socketpair just replaces TCP. Kernel buffering across the pair is plenty to avoid contention for typical in-process message sizes, and reusing the SP handshake + framing keeps the transport ~40 LOC instead of a parallel Connection implementation.

Defined Under Namespace

Classes: Listener

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine) ⇒ Listener

Binds engine to endpoint in the process-global registry.

Parameters:

  • endpoint (String)

    e.g. “inproc://my-endpoint”

  • engine (Engine)

Returns:



29
30
31
32
33
34
35
36
# File 'lib/nnq/transport/inproc.rb', line 29

def bind(endpoint, engine)
  @mutex.synchronize do
    raise Error, "inproc endpoint already bound: #{endpoint}" if @registry.key?(endpoint)
    @registry[endpoint] = engine
  end

  Listener.new(endpoint)
end

.connect(endpoint, engine) ⇒ void

This method returns an undefined value.

Connects engine to a bound inproc endpoint. Creates a Unix socketpair, hands one side to the bound engine (accepted), the other to the connecting engine (connected). Both sides run the normal SP handshake concurrently.

Parameters:

  • endpoint (String)
  • engine (Engine)

Raises:



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/nnq/transport/inproc.rb', line 47

def connect(endpoint, engine)
  bound = @mutex.synchronize { @registry[endpoint] }
  raise Error, "inproc endpoint not bound: #{endpoint}" unless bound

  a, b = UNIXSocket.pair

  # Handshake on the bound side must run concurrently with
  # ours — if we called bound.handle_accepted synchronously
  # it would block on reading our greeting before we've had
  # a chance to write it.
  bound.spawn_task(annotation: "nnq inproc accept #{endpoint}") do
    bound.handle_accepted(IO::Stream::Buffered.wrap(b), endpoint: endpoint)
  end
  engine.handle_connected(IO::Stream::Buffered.wrap(a), endpoint: endpoint)
end

.reset!Object

Clears the registry. For tests.



71
72
73
# File 'lib/nnq/transport/inproc.rb', line 71

def reset!
  @mutex.synchronize { @registry.clear }
end

.unbind(endpoint) ⇒ Object

Removes endpoint from the registry. Called by Listener#stop.



65
66
67
# File 'lib/nnq/transport/inproc.rb', line 65

def unbind(endpoint)
  @mutex.synchronize { @registry.delete(endpoint) }
end