Class: NNQ::Routing::Respondent
- Inherits:
-
Object
- Object
- NNQ::Routing::Respondent
- Includes:
- Backtrace
- Defined in:
- lib/nnq/routing/respondent.rb
Overview
RESPONDENT: reply side of the survey0 pattern.
Semantics mirror REP: strict alternation of #receive then #send_reply. The backtrace (survey ID + any hop IDs) is stripped on receive and echoed verbatim on reply.
Constant Summary
Constants included from Backtrace
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
-
#initialize(engine) ⇒ Respondent
constructor
A new instance of Respondent.
-
#preview_body(wire) ⇒ Object
Strips the backtrace header for verbose trace previews.
-
#receive ⇒ String?
Receives the next survey body.
-
#send_reply(body) ⇒ Object
Sends
bodyas the reply to the most recently received survey.
Methods included from Backtrace
#parse_backtrace, too_many_hops?
Constructor Details
#initialize(engine) ⇒ Respondent
Returns a new instance of Respondent.
18 19 20 21 22 23 |
# File 'lib/nnq/routing/respondent.rb', line 18 def initialize(engine) @engine = engine @recv_queue = Async::Queue.new @pending = nil @mutex = Mutex.new end |
Instance Method Details
#close ⇒ Object
81 82 83 |
# File 'lib/nnq/routing/respondent.rb', line 81 def close @recv_queue.enqueue(nil) end |
#close_read ⇒ Object
86 87 88 |
# File 'lib/nnq/routing/respondent.rb', line 86 def close_read @recv_queue.enqueue(nil) end |
#connection_removed(conn) ⇒ Object
74 75 76 77 78 |
# File 'lib/nnq/routing/respondent.rb', line 74 def connection_removed(conn) @mutex.synchronize do @pending = nil if @pending && @pending[0] == conn end end |
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
67 68 69 70 71 |
# File 'lib/nnq/routing/respondent.rb', line 67 def enqueue(body, conn) btrace, payload = parse_backtrace(body) return unless btrace # malformed/over-TTL — drop @recv_queue.enqueue([conn, btrace, payload]) end |
#preview_body(wire) ⇒ Object
Strips the backtrace header for verbose trace previews.
60 61 62 63 |
# File 'lib/nnq/routing/respondent.rb', line 60 def preview_body(wire) _, payload = parse_backtrace(wire) payload || wire end |
#receive ⇒ String?
Receives the next survey body. Stashes the backtrace + originating connection for the subsequent #send_reply.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/nnq/routing/respondent.rb', line 30 def receive @mutex.synchronize { @pending = nil } item = @recv_queue.dequeue return nil if item.nil? conn, btrace, body = item @mutex.synchronize { @pending = [conn, btrace] } body end |
#send_reply(body) ⇒ Object
Sends body as the reply to the most recently received survey.
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/nnq/routing/respondent.rb', line 45 def send_reply(body) conn, btrace = @mutex.synchronize do raise Error, "RESPONDENT socket has no pending survey to reply to" unless @pending taken = @pending @pending = nil taken end return if conn.closed? conn.(body, header: btrace) @engine.emit_verbose_msg_sent(body) end |