Class: OMQ::Transport::Inproc::DirectPipe

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/transport/inproc/direct_pipe.rb

Overview

A direct in-process pipe that transfers Ruby arrays through queues.

Implements the same interface as Connection so routing strategies can use it transparently.

When a routing strategy sets #direct_recv_queue on a pipe, #send_message enqueues directly into the peer’s recv queue, bypassing the intermediate pipe queues and the recv pump task. This reduces inproc from 3 queue hops to 2 (send_queue →recv_queue), eliminating the internal pipe queue in between.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:) ⇒ DirectPipe

Returns a new instance of DirectPipe.

Parameters:

  • send_queue (Async::Queue, nil) (defaults to: nil)

    outgoing command queue (nil for non-PUB/SUB types that don’t exchange commands)

  • receive_queue (Async::Queue, nil) (defaults to: nil)

    incoming command queue

  • peer_identity (String)
  • peer_type (String)


65
66
67
68
69
70
71
72
73
74
75
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 65

def initialize(send_queue: nil, receive_queue: nil, peer_identity:, peer_type:)
  @send_queue            = send_queue
  @receive_queue         = receive_queue
  @peer_identity         = peer_identity || "".b
  @peer_socket_type      = peer_type
  @closed                = false
  @peer                  = nil
  @direct_recv_queue     = nil
  @direct_recv_transform = nil
  @pending_direct        = nil
end

Instance Attribute Details

#direct_recv_queueAsync::LimitedQueue?

Returns when set, #send_message enqueues directly here instead of using the internal queue.

Returns:

  • (Async::LimitedQueue, nil)

    when set, #send_message enqueues directly here instead of using the internal queue



50
51
52
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 50

def direct_recv_queue
  @direct_recv_queue
end

#direct_recv_transformProc?

Returns optional transform applied before enqueuing into #direct_recv_queue.

Returns:



56
57
58
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 56

def direct_recv_transform
  @direct_recv_transform
end

#peerDirectPipe?

Returns the other end of this pipe pair.

Returns:

  • (DirectPipe, nil)

    the other end of this pipe pair



44
45
46
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 44

def peer
  @peer
end

#peer_identityString (readonly)

Returns peer’s identity.

Returns:

  • (String)

    peer’s identity



39
40
41
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 39

def peer_identity
  @peer_identity
end

#peer_socket_typeString (readonly)

Returns peer’s socket type.

Returns:

  • (String)

    peer’s socket type



20
21
22
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 20

def peer_socket_type
  @peer_socket_type
end

Instance Method Details

#closevoid

This method returns an undefined value.

Closes this pipe end and sends a nil sentinel to the peer.



197
198
199
200
201
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 197

def close
  return if @closed
  @closed = true
  @send_queue&.enqueue(nil) # close sentinel
end

#encrypted?Boolean

Returns always false; inproc pipes are never encrypted.

Returns:

  • (Boolean)

    always false; inproc pipes are never encrypted



127
128
129
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 127

def encrypted?
  false
end

#flushnil

No-op — inproc has no IO buffer to flush.

Returns:

  • (nil)


136
137
138
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 136

def flush
  nil
end

#peer_majorInteger

Returns always 3 — inproc peers are OMQ.

Returns:

  • (Integer)

    always 3 — inproc peers are OMQ



25
26
27
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 25

def peer_major
  3
end

#peer_minorInteger

Returns always 1 — inproc peers are OMQ (ZMTP 3.1).

Returns:

  • (Integer)

    always 1 — inproc peers are OMQ (ZMTP 3.1)



32
33
34
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 32

def peer_minor
  1
end

#read_frameProtocol::ZMTP::Codec::Frame

Reads one frame. Used by PUB/XPUB subscription listeners, which must see both the legacy message-form subscription (ZMTP 3.0) and the command-form (ZMTP 3.1).

Returns:

  • (Protocol::ZMTP::Codec::Frame)

Raises:

  • (EOFError)


181
182
183
184
185
186
187
188
189
190
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 181

def read_frame
  item = @receive_queue.dequeue
  raise EOFError, "connection closed" if item.nil?

  if item.is_a?(Array) && item.first == :command
    Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true)
  else
    Protocol::ZMTP::Codec::Frame.new(item.first || "".b)
  end
end

#receive_messageArray<String>

Receives a multi-frame message.

Returns:

  • (Array<String>)

Raises:

  • (EOFError)

    if closed



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 146

def receive_message
  loop do
    item = @receive_queue.dequeue

    raise EOFError, "connection closed" if item.nil?

    if item.is_a?(Array) && item.first == :command
      if block_given?
        yield Protocol::ZMTP::Codec::Frame.new(item[1].to_body, command: true)
      end
      next
    end

    return item
  end
end

#send_command(command) ⇒ Object

Sends a command via the internal command queue. Only available for PUB/SUB-family pipes.

Parameters:

  • command (Protocol::ZMTP::Codec::Command)

Raises:

  • (IOError)


169
170
171
172
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 169

def send_command(command)
  raise IOError, "closed" if @closed
  @send_queue.enqueue([:command, command])
end

#send_message(parts) ⇒ void Also known as: write_message

This method returns an undefined value.

Sends a multi-frame message.

Parameters:

  • parts (Array<String>)

Raises:

  • (IOError)


98
99
100
101
102
103
104
105
106
107
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 98

def send_message(parts)
  raise IOError, "closed" if @closed
  if @direct_recv_queue
    @direct_recv_queue.enqueue(apply_transform(parts))
  elsif @send_queue
    @send_queue.enqueue(parts)
  else
    (@pending_direct ||= []) << apply_transform(parts)
  end
end

#write_messages(messages) ⇒ void

This method returns an undefined value.

Batched form, for parity with Protocol::ZMTP::Connection. The work-stealing pumps call this when they dequeue more than one message at once; DirectPipe just loops — no mutex to amortize.

Parameters:

  • messages (Array<Array<String>>)


120
121
122
# File 'lib/omq/transport/inproc/direct_pipe.rb', line 120

def write_messages(messages)
  messages.each { |parts| send_message(parts) }
end