Class: NNQ::Routing::Rep
- Inherits:
-
Object
- Object
- NNQ::Routing::Rep
- Includes:
- Backtrace
- Defined in:
- lib/nnq/routing/rep.rb
Overview
REP: server side of req0/rep0.
Wire format: incoming bodies are ‘[backtrace stack]`. The backtrace is one or more 4-byte BE words; we keep reading words off the front until we hit one whose top byte has its high bit set (the original REQ’s request id terminates the stack). The whole backtrace is stashed and echoed verbatim on reply, prepended to the reply body. REP never reorders or rewrites the stack — it’s pure echo back to the originating pipe.
Semantics (cooked mode):
-
At most one pending request at a time. Calling #receive while a previous request is pending silently discards that request — its backtrace is forgotten and any later #send_reply will target the new request. This matches nng cooked rep0, where nng_recvmsg after nng_recvmsg drops the earlier message.
-
Calling #send_reply with no pending request raises.
-
The reply must be routed back to the same pipe the request came from. If that pipe died in the meantime, #send_reply silently drops the reply (matches nng’s pipe_terminated behavior).
-
TTL cap on the backtrace stack: 8 hops, matching nng’s default.
Constant Summary
Constants included from Backtrace
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#direct_recv_for(conn) ⇒ Object
Inproc fast-path hook: peer pipe parses the backtrace and enqueues the same [conn, btrace, payload] tuple the pump would.
-
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
-
#initialize(engine) ⇒ Rep
constructor
A new instance of Rep.
-
#preview_body(wire) ⇒ Object
Strips the backtrace header for verbose trace previews.
-
#receive ⇒ String?
Receives one request body.
-
#send_reply(body) ⇒ Object
Sends
bodyas the reply to the most recently received request.
Methods included from Backtrace
#parse_backtrace, too_many_hops?
Constructor Details
#initialize(engine) ⇒ Rep
Returns a new instance of Rep.
34 35 36 37 38 39 |
# File 'lib/nnq/routing/rep.rb', line 34 def initialize(engine) @engine = engine @recv_queue = Async::Queue.new # holds [conn, btrace, body] @pending = nil # [conn, btrace] or nil @mutex = Mutex.new end |
Instance Method Details
#close ⇒ Object
110 111 112 |
# File 'lib/nnq/routing/rep.rb', line 110 def close @recv_queue.enqueue(nil) end |
#close_read ⇒ Object
115 116 117 |
# File 'lib/nnq/routing/rep.rb', line 115 def close_read @recv_queue.enqueue(nil) end |
#connection_removed(conn) ⇒ Object
103 104 105 106 107 |
# File 'lib/nnq/routing/rep.rb', line 103 def connection_removed(conn) @mutex.synchronize do @pending = nil if @pending && @pending[0] == conn end end |
#direct_recv_for(conn) ⇒ Object
Inproc fast-path hook: peer pipe parses the backtrace and enqueues the same [conn, btrace, payload] tuple the pump would.
94 95 96 97 98 99 100 |
# File 'lib/nnq/routing/rep.rb', line 94 def direct_recv_for(conn) transform = lambda do |body| btrace, payload = parse_backtrace(body) btrace ? [conn, btrace, payload] : nil end [@recv_queue, transform] end |
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
85 86 87 88 89 |
# File 'lib/nnq/routing/rep.rb', line 85 def enqueue(body, conn) btrace, payload = parse_backtrace(body) return unless btrace # malformed/over-TTL — drop @recv_queue.enqueue([conn, btrace, payload]) end |
#preview_body(wire) ⇒ Object
Strips the backtrace header for verbose trace previews.
78 79 80 81 |
# File 'lib/nnq/routing/rep.rb', line 78 def preview_body(wire) _, payload = parse_backtrace(wire) payload || wire end |
#receive ⇒ String?
Receives one request body. Stashes the backtrace + originating connection so the next #send_reply can route the reply back.
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/nnq/routing/rep.rb', line 46 def receive # Any prior pending request is discarded — calling receive # again without replying is how users drop unwanted requests. @mutex.synchronize { @pending = nil } item = @recv_queue.dequeue return nil if item.nil? conn, btrace, body = item @mutex.synchronize { @pending = [conn, btrace] } body end |
#send_reply(body) ⇒ Object
Sends body as the reply to the most recently received request.
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/nnq/routing/rep.rb', line 63 def send_reply(body) conn, btrace = @mutex.synchronize do raise Error, "REP socket has no pending request to reply to" unless @pending taken = @pending @pending = nil taken end return if conn.closed? conn.(body, header: btrace) @engine.emit_verbose_msg_sent(body) end |