Class: NNQ::Routing::Rep

Inherits:
Object
  • Object
show all
Includes:
Backtrace
Defined in:
lib/nnq/routing/rep.rb

Overview

REP: server side of req0/rep0.

Wire format: incoming bodies are ‘[backtrace stack]`. The backtrace is one or more 4-byte BE words; we keep reading words off the front until we hit one whose top byte has its high bit set (the original REQ’s request id terminates the stack). The whole backtrace is stashed and echoed verbatim on reply, prepended to the reply body. REP never reorders or rewrites the stack — it’s pure echo back to the originating pipe.

Semantics (cooked mode):

  • At most one pending request at a time. Calling #receive while a previous request is pending silently discards that request — its backtrace is forgotten and any later #send_reply will target the new request. This matches nng cooked rep0, where nng_recvmsg after nng_recvmsg drops the earlier message.

  • Calling #send_reply with no pending request raises.

  • The reply must be routed back to the same pipe the request came from. If that pipe died in the meantime, #send_reply silently drops the reply (matches nng’s pipe_terminated behavior).

  • TTL cap on the backtrace stack: 8 hops, matching nng’s default.

Constant Summary

Constants included from Backtrace

Backtrace::MAX_HOPS

Instance Method Summary collapse

Methods included from Backtrace

#parse_backtrace

Constructor Details

#initialize(engine) ⇒ Rep

Returns a new instance of Rep.



34
35
36
37
38
39
# File 'lib/nnq/routing/rep.rb', line 34

def initialize(engine)
  @engine     = engine
  @recv_queue = Async::Queue.new   # holds [conn, btrace, body]
  @pending    = nil                # [conn, btrace] or nil
  @mutex      = Mutex.new
end

Instance Method Details

#closeObject



91
92
93
# File 'lib/nnq/routing/rep.rb', line 91

def close
  @recv_queue.enqueue(nil)
end

#close_readObject



96
97
98
# File 'lib/nnq/routing/rep.rb', line 96

def close_read
  @recv_queue.enqueue(nil)
end

#connection_removed(conn) ⇒ Object



84
85
86
87
88
# File 'lib/nnq/routing/rep.rb', line 84

def connection_removed(conn)
  @mutex.synchronize do
    @pending = nil if @pending && @pending[0] == conn
  end
end

#enqueue(body, conn) ⇒ Object

Called by the engine recv loop with each received message.



77
78
79
80
81
# File 'lib/nnq/routing/rep.rb', line 77

def enqueue(body, conn)
  btrace, payload = parse_backtrace(body)
  return unless btrace # malformed/over-TTL — drop
  @recv_queue.enqueue([conn, btrace, payload])
end

#receiveString?

Receives one request body. Stashes the backtrace + originating connection so the next #send_reply can route the reply back.

Returns:

  • (String, nil)

    body, or nil if the socket was closed



46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/nnq/routing/rep.rb', line 46

def receive
  # Any prior pending request is discarded — calling receive
  # again without replying is how users drop unwanted requests.
  @mutex.synchronize { @pending = nil }
  item = @recv_queue.dequeue

  return nil if item.nil?

  conn, btrace, body = item
  @mutex.synchronize { @pending = [conn, btrace] }
  body
end

#send_reply(body) ⇒ Object

Sends body as the reply to the most recently received request.

Parameters:

  • body (String)


63
64
65
66
67
68
69
70
71
72
73
# File 'lib/nnq/routing/rep.rb', line 63

def send_reply(body)
  conn, btrace = @mutex.synchronize do
    raise Error, "REP socket has no pending request to reply to" unless @pending
    taken    = @pending
    @pending = nil
    taken
  end

  return if conn.closed?
  conn.send_message(btrace + body)
end