Class: OMQ::Transport::Inproc::DirectPipe
- Inherits:
-
Object
- Object
- OMQ::Transport::Inproc::DirectPipe
- Defined in:
- lib/omq/transport/inproc/direct_pipe.rb
Overview
A direct in-process pipe that transfers Ruby arrays through queues.
Implements the same interface as Connection so routing strategies can use it transparently.
When a routing strategy sets #direct_recv_queue on a pipe, #send_message enqueues directly into the peer’s recv queue, bypassing the intermediate pipe queues and the recv pump task. This reduces inproc from 3 queue hops to 2 (send_queue →recv_queue), eliminating the internal pipe queue in between.
Instance Attribute Summary collapse
-
#direct_recv_queue ⇒ Async::LimitedQueue?
When set, #send_message enqueues directly here instead of using the internal queue.
-
#direct_recv_transform ⇒ Proc?
Optional transform applied before enqueuing into #direct_recv_queue.
-
#peer ⇒ DirectPipe?
The other end of this pipe pair.
-
#peer_identity ⇒ String
readonly
Peer’s identity.
-
#peer_socket_type ⇒ String
readonly
Peer’s socket type.
Instance Method Summary collapse
-
#close ⇒ void
Closes this pipe end and sends a nil sentinel to the peer.
-
#encrypted? ⇒ Boolean
Always false; inproc pipes are never encrypted.
-
#flush ⇒ nil
No-op — inproc has no IO buffer to flush.
-
#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ DirectPipe
constructor
A new instance of DirectPipe.
-
#peer_major ⇒ Integer
Always 3 — inproc peers are OMQ.
-
#peer_minor ⇒ Integer
Always 1 — inproc peers are OMQ (ZMTP 3.1).
-
#read_frame ⇒ Protocol::ZMTP::Codec::Frame
Reads one frame.
-
#receive_message ⇒ Array<String>
Receives a multi-frame message.
-
#send_command(command) ⇒ Object
Sends a command via the internal command queue.
-
#send_message(parts) ⇒ void
(also: #write_message)
Sends a multi-frame message.
-
#write_messages(messages) ⇒ void
Batched form, for parity with Protocol::ZMTP::Connection.
Constructor Details
#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ DirectPipe
Returns a new instance of DirectPipe.
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 65 def initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) @send_queue = send_queue @receive_queue = receive_queue @peer_identity = peer_identity || "".b @peer_socket_type = peer_type @closed = false @peer = nil @direct_recv_queue = nil @direct_recv_transform = nil @pending_direct = nil end |
Instance Attribute Details
#direct_recv_queue ⇒ Async::LimitedQueue?
Returns when set, #send_message enqueues directly here instead of using the internal queue.
50 51 52 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 50 def direct_recv_queue @direct_recv_queue end |
#direct_recv_transform ⇒ Proc?
Returns optional transform applied before enqueuing into #direct_recv_queue.
56 57 58 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 56 def direct_recv_transform @direct_recv_transform end |
#peer ⇒ DirectPipe?
Returns the other end of this pipe pair.
44 45 46 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 44 def peer @peer end |
#peer_identity ⇒ String (readonly)
Returns peer’s identity.
39 40 41 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 39 def peer_identity @peer_identity end |
#peer_socket_type ⇒ String (readonly)
Returns peer’s socket type.
20 21 22 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 20 def peer_socket_type @peer_socket_type end |
Instance Method Details
#close ⇒ void
This method returns an undefined value.
Closes this pipe end and sends a nil sentinel to the peer.
197 198 199 200 201 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 197 def close return if @closed @closed = true @send_queue&.enqueue(nil) # close sentinel end |
#encrypted? ⇒ Boolean
Returns always false; inproc pipes are never encrypted.
127 128 129 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 127 def encrypted? false end |
#flush ⇒ nil
No-op — inproc has no IO buffer to flush.
136 137 138 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 136 def flush nil end |
#peer_major ⇒ Integer
Returns always 3 — inproc peers are OMQ.
25 26 27 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 25 def peer_major 3 end |
#peer_minor ⇒ Integer
Returns always 1 — inproc peers are OMQ (ZMTP 3.1).
32 33 34 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 32 def peer_minor 1 end |
#read_frame ⇒ Protocol::ZMTP::Codec::Frame
Reads one frame. Used by PUB/XPUB subscription listeners, which must see both the legacy message-form subscription (ZMTP 3.0) and the command-form (ZMTP 3.1).
181 182 183 184 185 186 187 188 189 190 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 181 def read_frame item = @receive_queue.dequeue raise EOFError, "connection closed" if item.nil? if item.is_a?(Array) && item.first == :command Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true) else Protocol::ZMTP::Codec::Frame.new(item.first || "".b) end end |
#receive_message ⇒ Array<String>
Receives a multi-frame message.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 146 def loop do item = @receive_queue.dequeue raise EOFError, "connection closed" if item.nil? if item.is_a?(Array) && item.first == :command if block_given? yield Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true) end next end return item end end |
#send_command(command) ⇒ Object
Sends a command via the internal command queue. Only available for PUB/SUB-family pipes.
169 170 171 172 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 169 def send_command(command) raise IOError, "closed" if @closed @send_queue.enqueue([:command, command]) end |
#send_message(parts) ⇒ void Also known as: write_message
This method returns an undefined value.
Sends a multi-frame message.
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 98 def (parts) raise IOError, "closed" if @closed if @direct_recv_queue @direct_recv_queue.enqueue(apply_transform(parts)) elsif @send_queue @send_queue.enqueue(parts) else (@pending_direct ||= []) << apply_transform(parts) end end |
#write_messages(messages) ⇒ void
This method returns an undefined value.
Batched form, for parity with Protocol::ZMTP::Connection. The work-stealing pumps call this when they dequeue more than one message at once; DirectPipe just loops — no mutex to amortize.
120 121 122 |
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 120 def () .each { |parts| (parts) } end |