Module: OMQ::Transport::Inproc

Defined in:
lib/omq/transport/inproc.rb,
lib/omq/transport/inproc/direct_pipe.rb

Overview

In-process transport.

Both peers are Ruby backend sockets in the same process (native ZMQ’s inproc registry is separate and unreachable). Messages are transferred as Ruby arrays — no ZMTP framing, no byte serialization. String parts are frozen by Writable#send to prevent shared mutable state without copying.

Defined Under Namespace

Classes: DirectPipe, Listener

Constant Summary collapse

COMMAND_TYPES =

Socket types that exchange commands (SUBSCRIBE/CANCEL) over inproc.

%i[PUB SUB XPUB XSUB RADIO DISH].freeze

Class Method Summary collapse

Class Method Details

.bind(endpoint, engine) ⇒ Listener

Binds an engine to an inproc endpoint.

Parameters:

  • endpoint (String)

    e.g. “inproc://my-endpoint”

  • engine (Engine)

    the owning engine

Returns:

Raises:

  • (ArgumentError)

    if endpoint is already bound



38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/omq/transport/inproc.rb', line 38

def bind(endpoint, engine)
  @mutex.synchronize do
    if @registry.key?(endpoint)
      raise ArgumentError, "endpoint already bound: #{endpoint}"
    end

    @registry[endpoint] = engine

    # Wake any pending connects
    @waiters[endpoint].each { |p| p.resolve(true) }
    @waiters.delete(endpoint)
  end
  Listener.new(endpoint)
end

.connect(endpoint, engine) ⇒ void

This method returns an undefined value.

Connects to a bound inproc endpoint.

Parameters:

  • endpoint (String)

    e.g. “inproc://my-endpoint”

  • engine (Engine)

    the connecting engine



60
61
62
63
64
# File 'lib/omq/transport/inproc.rb', line 60

def connect(endpoint, engine)
  bound_engine = @mutex.synchronize { @registry[endpoint] }
  bound_engine ||= await_bind(endpoint, engine) or return
  establish_link(engine, bound_engine, endpoint)
end

.reset!void

This method returns an undefined value.

Resets the registry. Used in tests.



81
82
83
84
85
86
# File 'lib/omq/transport/inproc.rb', line 81

def reset!
  @mutex.synchronize do
    @registry.clear
    @waiters.clear
  end
end

.unbind(endpoint) ⇒ void

This method returns an undefined value.

Removes a bound endpoint from the registry.

Parameters:

  • endpoint (String)


72
73
74
# File 'lib/omq/transport/inproc.rb', line 72

def unbind(endpoint)
  @mutex.synchronize { @registry.delete(endpoint) }
end