Module: OMQ::Transport::Inproc
- Defined in:
- lib/omq/transport/inproc.rb,
lib/omq/transport/inproc/direct_pipe.rb
Overview
In-process transport.
Both peers are Ruby backend sockets in the same process (native ZMQ’s inproc registry is separate and unreachable). Messages are transferred as Ruby arrays — no ZMTP framing, no byte serialization. String parts are frozen by Writable#send to prevent shared mutable state without copying.
Defined Under Namespace
Classes: DirectPipe, Listener
Constant Summary collapse
- COMMAND_TYPES =
Socket types that exchange commands (SUBSCRIBE/CANCEL) over inproc.
%i[PUB SUB XPUB XSUB RADIO DISH].freeze
Class Method Summary collapse
-
.bind(endpoint, engine) ⇒ Listener
Binds an engine to an inproc endpoint.
-
.connect(endpoint, engine) ⇒ void
Connects to a bound inproc endpoint.
-
.reset! ⇒ void
Resets the registry.
-
.unbind(endpoint) ⇒ void
Removes a bound endpoint from the registry.
Class Method Details
.bind(endpoint, engine) ⇒ Listener
Binds an engine to an inproc endpoint.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/omq/transport/inproc.rb', line 38 def bind(endpoint, engine) @mutex.synchronize do if @registry.key?(endpoint) raise ArgumentError, "endpoint already bound: #{endpoint}" end @registry[endpoint] = engine # Wake any pending connects @waiters[endpoint].each { |p| p.resolve(true) } @waiters.delete(endpoint) end Listener.new(endpoint) end |
.connect(endpoint, engine) ⇒ void
This method returns an undefined value.
Connects to a bound inproc endpoint.
60 61 62 63 64 |
# File 'lib/omq/transport/inproc.rb', line 60 def connect(endpoint, engine) bound_engine = @mutex.synchronize { @registry[endpoint] } bound_engine ||= await_bind(endpoint, engine) or return establish_link(engine, bound_engine, endpoint) end |
.reset! ⇒ void
This method returns an undefined value.
Resets the registry. Used in tests.
81 82 83 84 85 86 |
# File 'lib/omq/transport/inproc.rb', line 81 def reset! @mutex.synchronize do @registry.clear @waiters.clear end end |
.unbind(endpoint) ⇒ void
This method returns an undefined value.
Removes a bound endpoint from the registry.
72 73 74 |
# File 'lib/omq/transport/inproc.rb', line 72 def unbind(endpoint) @mutex.synchronize { @registry.delete(endpoint) } end |