Class: OMQ::Transport::Inproc::Pipe
- Inherits:
-
Object
- Object
- OMQ::Transport::Inproc::Pipe
- Defined in:
- lib/omq/transport/inproc/pipe.rb
Overview
A direct in-process pipe that transfers Ruby arrays through queues.
Implements the same interface as Connection so routing strategies can use it transparently.
When a routing strategy sets #direct_recv_queue on a pipe, #send_message enqueues directly into the peer’s recv queue, bypassing the intermediate pipe queues and the recv pump task. This reduces inproc from 3 queue hops to 2 (send_queue →recv_queue), eliminating the internal pipe queue in between.
Instance Attribute Summary collapse
-
#direct_recv_queue ⇒ Async::LimitedQueue?
readonly
When set, #send_message enqueues directly here instead of using the internal queue.
-
#peer ⇒ Pipe?
The other end of this pipe pair.
-
#peer_identity ⇒ String
readonly
Peer’s identity.
-
#peer_socket_type ⇒ String
readonly
Peer’s socket type.
Instance Method Summary collapse
-
#close ⇒ void
Closes this pipe end and sends a nil sentinel to the peer.
-
#encrypted? ⇒ Boolean
Always false; inproc pipes are never encrypted.
-
#flush ⇒ nil
No-op — inproc has no IO buffer to flush.
-
#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ Pipe
constructor
A new instance of Pipe.
-
#peer_major ⇒ Integer
Always 3 — inproc peers are OMQ.
-
#peer_minor ⇒ Integer
Always 1 — inproc peers are OMQ (ZMTP 3.1).
-
#read_frame ⇒ Protocol::ZMTP::Codec::Frame
Reads one frame.
-
#receive_message ⇒ Array<String>
Receives a multi-frame message.
-
#send_command(command) ⇒ Object
Sends a command via the internal command queue.
-
#send_message(parts) ⇒ void
(also: #write_message)
Sends a multi-frame message.
-
#wire_direct_recv(queue, transform) ⇒ void
Wires up the direct recv fast-path.
-
#write_messages(messages) ⇒ void
Batched form, for parity with Protocol::ZMTP::Connection.
Constructor Details
#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ Pipe
Returns a new instance of Pipe.
59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/omq/transport/inproc/pipe.rb', line 59 def initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) @send_queue = send_queue @receive_queue = receive_queue @peer_identity = peer_identity || "".b @peer_socket_type = peer_type @closed = false @peer = nil @direct_recv_queue = nil @direct_recv_transform = nil @pending_direct = nil end |
Instance Attribute Details
#direct_recv_queue ⇒ Async::LimitedQueue? (readonly)
Returns when set, #send_message enqueues directly here instead of using the internal queue.
50 51 52 |
# File 'lib/omq/transport/inproc/pipe.rb', line 50 def direct_recv_queue @direct_recv_queue end |
#peer ⇒ Pipe?
Returns the other end of this pipe pair.
44 45 46 |
# File 'lib/omq/transport/inproc/pipe.rb', line 44 def peer @peer end |
#peer_identity ⇒ String (readonly)
Returns peer’s identity.
39 40 41 |
# File 'lib/omq/transport/inproc/pipe.rb', line 39 def peer_identity @peer_identity end |
#peer_socket_type ⇒ String (readonly)
Returns peer’s socket type.
20 21 22 |
# File 'lib/omq/transport/inproc/pipe.rb', line 20 def peer_socket_type @peer_socket_type end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes this pipe end and sends a nil sentinel to the peer.
199 200 201 202 203 |
# File 'lib/omq/transport/inproc/pipe.rb', line 199 def close return if @closed @closed = true @send_queue&.enqueue(nil) # close sentinel end |
#encrypted? ⇒ Boolean
Returns always false; inproc pipes are never encrypted.
131 132 133 |
# File 'lib/omq/transport/inproc/pipe.rb', line 131 def encrypted? false end |
#flush ⇒ nil
No-op — inproc has no IO buffer to flush.
140 141 142 |
# File 'lib/omq/transport/inproc/pipe.rb', line 140 def flush nil end |
#peer_major ⇒ Integer
Returns always 3 — inproc peers are OMQ.
25 26 27 |
# File 'lib/omq/transport/inproc/pipe.rb', line 25 def peer_major 3 end |
#peer_minor ⇒ Integer
Returns always 1 — inproc peers are OMQ (ZMTP 3.1).
32 33 34 |
# File 'lib/omq/transport/inproc/pipe.rb', line 32 def peer_minor 1 end |
#read_frame ⇒ Protocol::ZMTP::Codec::Frame
Reads one frame. Used by PUB/XPUB subscription listeners, which must see both the legacy message-form subscription (ZMTP 3.0) and the command-form (ZMTP 3.1).
184 185 186 187 188 189 190 191 192 |
# File 'lib/omq/transport/inproc/pipe.rb', line 184 def read_frame item = @receive_queue.dequeue or raise EOFError, "connection closed" if item.is_a?(Array) && item.first == :command Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true) else Protocol::ZMTP::Codec::Frame.new(item.first || "".b) end end |
#receive_message ⇒ Array<String>
Receives a multi-frame message.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/omq/transport/inproc/pipe.rb', line 150 def loop do item = @receive_queue.dequeue or raise EOFError, "connection closed" if item.is_a?(Array) && item.first == :command if block_given? yield Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true) end next end return item end end |
#send_command(command) ⇒ Object
Sends a command via the internal command queue. Only available for PUB/SUB-family pipes.
172 173 174 175 |
# File 'lib/omq/transport/inproc/pipe.rb', line 172 def send_command(command) raise IOError, "closed" if @closed @send_queue.enqueue([:command, command]) end |
#send_message(parts) ⇒ void Also known as: write_message
This method returns an undefined value.
Sends a multi-frame message.
101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/omq/transport/inproc/pipe.rb', line 101 def (parts) raise IOError, "closed" if @closed if @direct_recv_queue @direct_recv_queue.enqueue(apply_transform(parts)) elsif @send_queue @send_queue.enqueue(parts) else (@pending_direct ||= []) << apply_transform(parts) end end |
#wire_direct_recv(queue, transform) ⇒ void
This method returns an undefined value.
Wires up the direct recv fast-path. Called once by the recv pump when the receiving side of an inproc pipe pair is set up. After this, peer-side #send_message calls enqueue straight into queue instead of hopping through the intermediate pipe queue and a recv pump fiber.
Drains any messages the peer buffered into @pending_direct before the queue was available.
85 86 87 88 89 90 91 92 93 |
# File 'lib/omq/transport/inproc/pipe.rb', line 85 def wire_direct_recv(queue, transform) @direct_recv_transform = transform @direct_recv_queue = queue return unless @pending_direct @pending_direct.each { |msg| queue.enqueue(msg) } @pending_direct = nil end |
#write_messages(messages) ⇒ void
This method returns an undefined value.
Batched form, for parity with Protocol::ZMTP::Connection. The work-stealing pumps call this when they dequeue more than one message at once; Pipe just loops — no mutex to amortize.
124 125 126 |
# File 'lib/omq/transport/inproc/pipe.rb', line 124 def () .each { |parts| (parts) } end |