Class: NNQ::Routing::Req

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/req.rb

Overview

REQ: client side of req0/rep0.

Wire format: each message body on the wire is ‘[4-byte BE request_id]`. The request id has the high bit set (`0x80000000..0xFFFFFFFF`) — that’s nng’s marker for “this is the last (deepest) word on the backtrace stack”. Direct REQ→REP has exactly one id.

Semantics (cooked mode, what this implements):

  • At most one in-flight request per socket. Issuing a new send_request while a previous one is still waiting for its reply cancels the previous one: the blocked caller wakes up with a NNQ::RequestCancelled error and the late reply (if any) is silently dropped. This matches nng cooked req0, where a new nng_sendmsg abandons the prior request.

  • Reply is matched by id, NOT by pipe. Late or unmatched replies are silently dropped.

  • Round-robin peer selection, but no retry timer (real nng resends on a timer; we leave that to the user via timeouts).

  • Blocks waiting for a peer if no connection is currently up.

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Req

Returns a new instance of Req.



30
31
32
33
34
35
# File 'lib/nnq/routing/req.rb', line 30

def initialize(engine)
  @engine     = engine
  @next_idx   = 0
  @mutex      = Mutex.new
  @outstanding = nil # [id, promise] or nil
end

Instance Method Details

#closeObject



84
85
86
87
88
89
# File 'lib/nnq/routing/req.rb', line 84

def close
  @mutex.synchronize do
    @outstanding&.last&.reject(NNQ::Error.new("REQ socket closed"))
    @outstanding = nil
  end
end

#enqueue(body, _conn) ⇒ Object

Called by the engine recv loop with each received message.



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/nnq/routing/req.rb', line 70

def enqueue(body, _conn)
  return if body.bytesize < 4
  id      = body.unpack1("N")
  payload = body.byteslice(4..)

  @mutex.synchronize do
    if @outstanding && @outstanding[0] == id
      @outstanding[1].resolve(payload)
    end
    # Mismatched id → late/spurious reply, silently dropped.
  end
end

#send_request(body) ⇒ String

Sends body as a request, blocks until the matching reply comes back. Returns the reply payload (without the id header).

If another fiber issues a send_request while this call is waiting, this call raises NNQ::RequestCancelled.

Parameters:

  • body (String)

Returns:

  • (String)


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/nnq/routing/req.rb', line 46

def send_request(body)
  id      = SecureRandom.random_number(0x80000000) | 0x80000000
  promise = Async::Promise.new

  @mutex.synchronize do
    # Cancel any in-flight request — new send supersedes it.
    @outstanding&.last&.reject(RequestCancelled.new("cancelled by new send_request"))
    @outstanding = [id, promise]
  end

  conn   = pick_peer
  header = [id].pack("N")
  conn.send_message(header + body)
  promise.wait
ensure
  @mutex.synchronize do
    # Only clear the slot if it's still ours. If a concurrent
    # send_request already replaced it, leave the new entry alone.
    @outstanding = nil if @outstanding && @outstanding[0] == id
  end
end