Class: OMQ::Transport::Inproc::DirectPipe
- Inherits:
-
Object
- Object
- OMQ::Transport::Inproc::DirectPipe
- Defined in:
- lib/omq/transport/inproc/direct_pipe.rb
Overview
A direct in-process pipe that transfers Ruby arrays through queues.
Implements the same interface as Connection so routing strategies can use it transparently.
When a routing strategy sets #direct_recv_queue on a pipe, #send_message enqueues directly into the peer’s recv queue, bypassing the intermediate pipe queues and the recv pump task. This reduces inproc from 3 queue hops to 2 (send_queue →recv_queue), eliminating the internal pipe queue in between.
Instance Attribute Summary collapse
-
#direct_recv_queue ⇒ Async::LimitedQueue?
When set, #send_message enqueues directly here instead of using the internal queue.
-
#direct_recv_transform ⇒ Proc?
Optional transform applied before enqueuing into #direct_recv_queue.
-
#peer ⇒ DirectPipe?
The other end of this pipe pair.
-
#peer_identity ⇒ String
readonly
Peer’s identity.
-
#peer_socket_type ⇒ String
readonly
Peer’s socket type.
Instance Method Summary collapse
-
#close ⇒ void
Closes this pipe end and sends a nil sentinel to the peer.
-
#encrypted? ⇒ Boolean
Always false; inproc pipes are never encrypted.
-
#flush ⇒ nil
No-op — inproc has no IO buffer to flush.
-
#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ DirectPipe
constructor
A new instance of DirectPipe.
-
#read_frame ⇒ Protocol::ZMTP::Codec::Frame
Reads one command frame from the internal command queue.
-
#receive_message ⇒ Array<String>
Receives a multi-frame message.
-
#send_command(command) ⇒ Object
Sends a command via the internal command queue.
-
#send_message(parts) ⇒ void
(also: #write_message)
Sends a multi-frame message.
-
#write_messages(messages) ⇒ void
Batched form, for parity with Protocol::ZMTP::Connection.
Constructor Details
#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ DirectPipe
Returns a new instance of DirectPipe.
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 51 def initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) @send_queue = send_queue @receive_queue = receive_queue @peer_identity = peer_identity || "".b @peer_socket_type = peer_type @closed = false @peer = nil @direct_recv_queue = nil @direct_recv_transform = nil @pending_direct = nil end |
Instance Attribute Details
#direct_recv_queue ⇒ Async::LimitedQueue?
Returns when set, #send_message enqueues directly here instead of using the internal queue.
36 37 38 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 36 def direct_recv_queue @direct_recv_queue end |
#direct_recv_transform ⇒ Proc?
Returns optional transform applied before enqueuing into #direct_recv_queue.
42 43 44 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 42 def direct_recv_transform @direct_recv_transform end |
#peer ⇒ DirectPipe?
Returns the other end of this pipe pair.
30 31 32 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 30 def peer @peer end |
#peer_identity ⇒ String (readonly)
Returns peer’s identity.
25 26 27 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 25 def peer_identity @peer_identity end |
#peer_socket_type ⇒ String (readonly)
Returns peer’s socket type.
20 21 22 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 20 def peer_socket_type @peer_socket_type end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes this pipe end and sends a nil sentinel to the peer.
182 183 184 185 186 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 182 def close return if @closed @closed = true @send_queue&.enqueue(nil) # close sentinel end |
#encrypted? ⇒ Boolean
Returns always false; inproc pipes are never encrypted.
113 114 115 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 113 def encrypted? false end |
#flush ⇒ nil
No-op — inproc has no IO buffer to flush.
122 123 124 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 122 def flush nil end |
#read_frame ⇒ Protocol::ZMTP::Codec::Frame
Reads one command frame from the internal command queue. Used by PUB/XPUB subscription listeners.
166 167 168 169 170 171 172 173 174 175 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 166 def read_frame loop do item = @receive_queue.dequeue raise EOFError, "connection closed" if item.nil? if item.is_a?(Array) && item.first == :command return Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true) end end end |
#receive_message ⇒ Array<String>
Receives a multi-frame message.
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 132 def loop do item = @receive_queue.dequeue raise EOFError, "connection closed" if item.nil? if item.is_a?(Array) && item.first == :command if block_given? yield Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true) end next end return item end end |
#send_command(command) ⇒ Object
Sends a command via the internal command queue. Only available for PUB/SUB-family pipes.
155 156 157 158 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 155 def send_command(command) raise IOError, "closed" if @closed @send_queue.enqueue([:command, command]) end |
#send_message(parts) ⇒ void Also known as: write_message
This method returns an undefined value.
Sends a multi-frame message.
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 84 def (parts) raise IOError, "closed" if @closed if @direct_recv_queue @direct_recv_queue.enqueue(apply_transform(parts)) elsif @send_queue @send_queue.enqueue(parts) else (@pending_direct ||= []) << apply_transform(parts) end end |
#write_messages(messages) ⇒ void
This method returns an undefined value.
Batched form, for parity with Protocol::ZMTP::Connection. The work-stealing pumps call this when they dequeue more than one message at once; DirectPipe just loops — no mutex to amortize.
106 107 108 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 106 def () .each { |parts| (parts) } end |