Module: OMQ::Transport::Inproc

Defined in:
lib/omq/transport/inproc.rb,
lib/omq/transport/inproc/direct_pipe.rb

Overview

In-process transport.

Both peers are Ruby backend sockets in the same process (native ZMQ’s inproc registry is separate and unreachable). Messages are transferred as Ruby arrays — no ZMTP framing, no byte serialization. String parts are frozen by Writable#send to prevent shared mutable state without copying.

Defined Under Namespace

Classes: DirectPipe, Listener

Constant Summary collapse

COMMAND_TYPES =

Socket types that exchange commands (SUBSCRIBE/CANCEL) over inproc.

%i[PUB SUB XPUB XSUB RADIO DISH].freeze

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.registryHash{String => Engine} (readonly)

Returns bound inproc endpoints.

Returns:

  • (Hash{String => Engine})

    bound inproc endpoints



36
37
38
# File 'lib/omq/transport/inproc.rb', line 36

def registry
  @registry
end

Class Method Details

.connect(endpoint, engine) ⇒ void

This method returns an undefined value.

Connects to a bound inproc endpoint.

Parameters:

  • endpoint (String)

    e.g. “inproc://my-endpoint”

  • engine (Engine)

    the connecting engine



68
69
70
71
72
# File 'lib/omq/transport/inproc.rb', line 68

def connect(endpoint, engine, **)
  bound_engine = @mutex.synchronize { @registry[endpoint] }
  bound_engine ||= await_bind(endpoint, engine) or return
  establish_link(engine, bound_engine, endpoint)
end

.listener(endpoint, engine) ⇒ Listener

Creates a bound inproc listener.

Parameters:

  • endpoint (String)

    e.g. “inproc://my-endpoint”

  • engine (Engine)

    the owning engine

Returns:

Raises:

  • (ArgumentError)

    if endpoint is already bound



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/omq/transport/inproc.rb', line 46

def listener(endpoint, engine, **)
  @mutex.synchronize do
    if @registry.key?(endpoint)
      raise ArgumentError, "endpoint already bound: #{endpoint}"
    end

    @registry[endpoint] = engine

    # Wake any pending connects
    @waiters[endpoint].each { |p| p.resolve(true) }
    @waiters.delete(endpoint)
  end
  Listener.new(endpoint)
end

.reset!void

This method returns an undefined value.

Resets the registry. Used in tests.



89
90
91
92
93
94
# File 'lib/omq/transport/inproc.rb', line 89

def reset!
  @mutex.synchronize do
    @registry.clear
    @waiters.clear
  end
end

.unbind(endpoint) ⇒ void

This method returns an undefined value.

Removes a bound endpoint from the registry.

Parameters:

  • endpoint (String)


80
81
82
# File 'lib/omq/transport/inproc.rb', line 80

def unbind(endpoint)
  @mutex.synchronize { @registry.delete(endpoint) }
end