Class: OMQ::Transport::Inproc::DirectPipe

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/transport/inproc/direct_pipe.rb

Overview

A direct in-process pipe that transfers Ruby arrays through queues.

Implements the same interface as Connection so routing strategies can use it transparently.

When a routing strategy sets #direct_recv_queue on a pipe, #send_message enqueues directly into the peer’s recv queue, bypassing the intermediate pipe queues and the recv pump task. This reduces inproc from 3 queue hops to 2 (send_queue →recv_queue), eliminating the internal pipe queue in between.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ DirectPipe

Returns a new instance of DirectPipe.

Parameters:

  • send_queue (Async::Queue, nil) (defaults to: nil)

    outgoing command queue (nil for non-PUB/SUB types that don’t exchange commands)

  • receive_queue (Async::Queue, nil) (defaults to: nil)

    incoming command queue

  • peer_identity (String)
  • peer_type (String)


51
52
53
54
55
56
57
58
59
60
61
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 51

def initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:)
  @send_queue            = send_queue
  @receive_queue         = receive_queue
  @peer_identity         = peer_identity || "".b
  @peer_socket_type      = peer_type
  @closed                = false
  @peer                  = nil
  @direct_recv_queue     = nil
  @direct_recv_transform = nil
  @pending_direct        = nil
end

Instance Attribute Details

#direct_recv_queueAsync::LimitedQueue?

Returns when set, #send_message enqueues directly here instead of using the internal queue.

Returns:

  • (Async::LimitedQueue, nil)

    when set, #send_message enqueues directly here instead of using the internal queue



36
37
38
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 36

def direct_recv_queue
  @direct_recv_queue
end

#direct_recv_transformProc?

Returns optional transform applied before enqueuing into #direct_recv_queue.

Returns:



42
43
44
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 42

def direct_recv_transform
  @direct_recv_transform
end

#peerDirectPipe?

Returns the other end of this pipe pair.

Returns:

  • (DirectPipe, nil)

    the other end of this pipe pair



30
31
32
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 30

def peer
  @peer
end

#peer_identityString (readonly)

Returns peer’s identity.

Returns:

  • (String)

    peer’s identity



25
26
27
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 25

def peer_identity
  @peer_identity
end

#peer_socket_typeString (readonly)

Returns peer’s socket type.

Returns:

  • (String)

    peer’s socket type



20
21
22
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 20

def peer_socket_type
  @peer_socket_type
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes this pipe end and sends a nil sentinel to the peer.



182
183
184
185
186
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 182

def close
  return if @closed
  @closed = true
  @send_queue&.enqueue(nil) # close sentinel
end

#encrypted?Boolean

Returns always false; inproc pipes are never encrypted.

Returns:

  • (Boolean)

    always false; inproc pipes are never encrypted



113
114
115
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 113

def encrypted?
  false
end

#flushnil

No-op — inproc has no IO buffer to flush.

Returns:

  • (nil)


122
123
124
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 122

def flush
  nil
end

#read_frameProtocol::ZMTP::Codec::Frame

Reads one command frame from the internal command queue. Used by PUB/XPUB subscription listeners.

Returns:

  • (Protocol::ZMTP::Codec::Frame)


166
167
168
169
170
171
172
173
174
175
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 166

def read_frame
  loop do
    item = @receive_queue.dequeue
    raise EOFError, "connection closed" if item.nil?

    if item.is_a?(Array) && item.first == :command
      return Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true)
    end
  end
end

#receive_messageArray<String>

Receives a multi-frame message.

Returns:

  • (Array<String>)

Raises:

  • (EOFError)

    if closed



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 132

def receive_message
  loop do
    item = @receive_queue.dequeue

    raise EOFError, "connection closed" if item.nil?

    if item.is_a?(Array) && item.first == :command
      if block_given?
        yield Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true)
      end
      next
    end

    return item
  end
end

#send_command(command) ⇒ Object

Sends a command via the internal command queue. Only available for PUB/SUB-family pipes.

Parameters:

  • command (Protocol::ZMTP::Codec::Command)

Raises:

  • (IOError)


155
156
157
158
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 155

def send_command(command)
  raise IOError, "closed" if @closed
  @send_queue.enqueue([:command, command])
end

#send_message(parts) ⇒ void Also known as: write_message

This method returns an undefined value.

Sends a multi-frame message.

Parameters:

  • parts (Array<String>)

Raises:

  • (IOError)


84
85
86
87
88
89
90
91
92
93
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 84

def send_message(parts)
  raise IOError, "closed" if @closed
  if @direct_recv_queue
    @direct_recv_queue.enqueue(apply_transform(parts))
  elsif @send_queue
    @send_queue.enqueue(parts)
  else
    (@pending_direct ||= []) << apply_transform(parts)
  end
end

#write_messages(messages) ⇒ void

This method returns an undefined value.

Batched form, for parity with Protocol::ZMTP::Connection. The work-stealing pumps call this when they dequeue more than one message at once; DirectPipe just loops — no mutex to amortize.

Parameters:

  • messages (Array<Array<String>>)


106
107
108
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 106

def write_messages(messages)
  messages.each { |parts| send_message(parts) }
end