Class: NNQ::Transport::Inproc::Pipe

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/transport/inproc/pipe.rb

Overview

Queue-based in-process pipe. Duck-types Connection so routing strategies, the recv loop, and the send pump work against it unchanged.

No wire framing: bodies are transferred as frozen Strings through a pair of Async::Queue (one per direction). When an SP backtrace header is supplied (REQ/REP/SURVEYOR paths), it’s prepended before enqueue so #receive_message returns an already-prefixed body — matching the TCP/IPC framing semantic so routing’s ‘parse_backtrace` parses the same layout either way.

Direct-recv fast path: when a routing strategy calls #wire_direct_recv on the peer side of a pipe pair, subsequent #send_message calls enqueue straight into the consumer’s recv queue — the intermediate pipe queue and the recv pump fiber are both skipped. Cuts three fiber hops to one and is what lets inproc PUSH/PULL clear 1M msg/s on YJIT.

Wiring happens synchronously inside connect (before the call returns to the caller), so there’s no window in which a send can precede a wire — no pending buffer needed.

Close protocol: #close enqueues a ‘nil` sentinel onto the send side (or the direct queue if wired). The peer’s recv loop sees ‘nil`, raises `EOFError`, and unwinds via its connection supervisor.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(send_queue:, recv_queue:, endpoint:) ⇒ Pipe

Returns a new instance of Pipe.



45
46
47
48
49
50
51
52
53
# File 'lib/nnq/transport/inproc/pipe.rb', line 45

def initialize(send_queue:, recv_queue:, endpoint:)
  @send_queue            = send_queue
  @recv_queue            = recv_queue
  @endpoint              = endpoint
  @closed                = false
  @peer                  = nil
  @direct_recv_queue     = nil
  @direct_recv_transform = nil
end

Instance Attribute Details

#direct_recv_queueAsync::Queue? (readonly)

Returns when non-nil, #send_message enqueues here instead of into @send_queue.

Returns:

  • (Async::Queue, nil)

    when non-nil, #send_message enqueues here instead of into @send_queue.



42
43
44
# File 'lib/nnq/transport/inproc/pipe.rb', line 42

def direct_recv_queue
  @direct_recv_queue
end

#endpointString? (readonly)

Returns endpoint URI this pipe was established on.

Returns:

  • (String, nil)

    endpoint URI this pipe was established on



35
36
37
# File 'lib/nnq/transport/inproc/pipe.rb', line 35

def endpoint
  @endpoint
end

#peerPipe?

Returns the other end of the pair.

Returns:

  • (Pipe, nil)

    the other end of the pair



38
39
40
# File 'lib/nnq/transport/inproc/pipe.rb', line 38

def peer
  @peer
end

Instance Method Details

#closeObject



119
120
121
122
123
124
125
126
# File 'lib/nnq/transport/inproc/pipe.rb', line 119

def close
  return if @closed
  @closed = true
  # Close sentinel goes on whichever queue the peer is reading.
  # When direct-wired, @send_queue is unused; hit the direct
  # queue so the consumer unblocks.
  (@direct_recv_queue || @send_queue).enqueue(nil)
end

#closed?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/nnq/transport/inproc/pipe.rb', line 114

def closed?
  @closed
end

#flushObject

No-op — Async::Queue has no IO buffer to flush.



102
103
104
# File 'lib/nnq/transport/inproc/pipe.rb', line 102

def flush
  nil
end

#receive_messageObject

Raises:

  • (EOFError)


107
108
109
110
111
# File 'lib/nnq/transport/inproc/pipe.rb', line 107

def receive_message
  item = @recv_queue.dequeue
  raise EOFError, "connection closed" if item.nil?
  item
end

#send_message(body, header: nil) ⇒ Object Also known as: write_message

Raises:



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/nnq/transport/inproc/pipe.rb', line 70

def send_message(body, header: nil)
  raise ClosedError, "connection closed" if @closed
  wire = header ? header + body : body

  if (q = @direct_recv_queue)
    item = @direct_recv_transform ? @direct_recv_transform.call(wire) : wire
    q.enqueue(item) unless item.nil?
  else
    @send_queue.enqueue(wire)
  end
end

#wire_direct_recv(queue, transform) ⇒ Object

Wires the direct-recv fast path. After this call, messages sent on this pipe bypass the intermediate pipe queue and land directly in queue.

Parameters:

  • queue (Async::Queue)
  • transform (Proc, nil)

    optional per-message transform; return nil to drop the message (used by filter/parse strategies like SUB or REP).



64
65
66
67
# File 'lib/nnq/transport/inproc/pipe.rb', line 64

def wire_direct_recv(queue, transform)
  @direct_recv_transform = transform
  @direct_recv_queue     = queue
end

#write_messages(bodies) ⇒ Object

Raises:



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/nnq/transport/inproc/pipe.rb', line 86

def write_messages(bodies)
  raise ClosedError, "connection closed" if @closed

  if (q = @direct_recv_queue)
    transform = @direct_recv_transform
    bodies.each do |body|
      item = transform ? transform.call(body) : body
      q.enqueue(item) unless item.nil?
    end
  else
    bodies.each { |body| @send_queue.enqueue(body) }
  end
end