Class: NNQ::Routing::Rep
- Inherits:
-
Object
- Object
- NNQ::Routing::Rep
- Includes:
- Backtrace
- Defined in:
- lib/nnq/routing/rep.rb
Overview
REP: server side of req0/rep0.
Wire format: incoming bodies are ‘[backtrace stack]`. The backtrace is one or more 4-byte BE words; we keep reading words off the front until we hit one whose top byte has its high bit set (the original REQ’s request id terminates the stack). The whole backtrace is stashed and echoed verbatim on reply, prepended to the reply body. REP never reorders or rewrites the stack — it’s pure echo back to the originating pipe.
Semantics (cooked mode):
-
At most one pending request at a time. Calling #receive while a previous request is pending silently discards that request — its backtrace is forgotten and any later #send_reply will target the new request. This matches nng cooked rep0, where nng_recvmsg after nng_recvmsg drops the earlier message.
-
Calling #send_reply with no pending request raises.
-
The reply must be routed back to the same pipe the request came from. If that pipe died in the meantime, #send_reply silently drops the reply (matches nng’s pipe_terminated behavior).
-
TTL cap on the backtrace stack: 8 hops, matching nng’s default.
Constant Summary
Constants included from Backtrace
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
-
#initialize(engine) ⇒ Rep
constructor
A new instance of Rep.
-
#receive ⇒ String?
Receives one request body.
-
#send_reply(body) ⇒ Object
Sends
bodyas the reply to the most recently received request.
Methods included from Backtrace
Constructor Details
#initialize(engine) ⇒ Rep
Returns a new instance of Rep.
34 35 36 37 38 39 |
# File 'lib/nnq/routing/rep.rb', line 34 def initialize(engine) @engine = engine @recv_queue = Async::Queue.new # holds [conn, btrace, body] @pending = nil # [conn, btrace] or nil @mutex = Mutex.new end |
Instance Method Details
#close ⇒ Object
91 92 93 |
# File 'lib/nnq/routing/rep.rb', line 91 def close @recv_queue.enqueue(nil) end |
#close_read ⇒ Object
96 97 98 |
# File 'lib/nnq/routing/rep.rb', line 96 def close_read @recv_queue.enqueue(nil) end |
#connection_removed(conn) ⇒ Object
84 85 86 87 88 |
# File 'lib/nnq/routing/rep.rb', line 84 def connection_removed(conn) @mutex.synchronize do @pending = nil if @pending && @pending[0] == conn end end |
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
77 78 79 80 81 |
# File 'lib/nnq/routing/rep.rb', line 77 def enqueue(body, conn) btrace, payload = parse_backtrace(body) return unless btrace # malformed/over-TTL — drop @recv_queue.enqueue([conn, btrace, payload]) end |
#receive ⇒ String?
Receives one request body. Stashes the backtrace + originating connection so the next #send_reply can route the reply back.
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/nnq/routing/rep.rb', line 46 def receive # Any prior pending request is discarded — calling receive # again without replying is how users drop unwanted requests. @mutex.synchronize { @pending = nil } item = @recv_queue.dequeue return nil if item.nil? conn, btrace, body = item @mutex.synchronize { @pending = [conn, btrace] } body end |
#send_reply(body) ⇒ Object
Sends body as the reply to the most recently received request.
63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/nnq/routing/rep.rb', line 63 def send_reply(body) conn, btrace = @mutex.synchronize do raise Error, "REP socket has no pending request to reply to" unless @pending taken = @pending @pending = nil taken end return if conn.closed? conn.(btrace + body) end |