Class: NNQ::Routing::Respondent
- Inherits:
-
Object
- Object
- NNQ::Routing::Respondent
- Includes:
- Backtrace
- Defined in:
- lib/nnq/routing/respondent.rb
Overview
RESPONDENT: reply side of the survey0 pattern.
Semantics mirror REP: strict alternation of #receive then #send_reply. The backtrace (survey ID + any hop IDs) is stripped on receive and echoed verbatim on reply.
Constant Summary
Constants included from Backtrace
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
-
#initialize(engine) ⇒ Respondent
constructor
A new instance of Respondent.
-
#receive ⇒ String?
Receives the next survey body.
-
#send_reply(body) ⇒ Object
Sends
bodyas the reply to the most recently received survey.
Methods included from Backtrace
Constructor Details
#initialize(engine) ⇒ Respondent
Returns a new instance of Respondent.
18 19 20 21 22 23 |
# File 'lib/nnq/routing/respondent.rb', line 18 def initialize(engine) @engine = engine @recv_queue = Async::Queue.new @pending = nil @mutex = Mutex.new end |
Instance Method Details
#close ⇒ Object
73 74 75 |
# File 'lib/nnq/routing/respondent.rb', line 73 def close @recv_queue.enqueue(nil) end |
#close_read ⇒ Object
78 79 80 |
# File 'lib/nnq/routing/respondent.rb', line 78 def close_read @recv_queue.enqueue(nil) end |
#connection_removed(conn) ⇒ Object
66 67 68 69 70 |
# File 'lib/nnq/routing/respondent.rb', line 66 def connection_removed(conn) @mutex.synchronize do @pending = nil if @pending && @pending[0] == conn end end |
#enqueue(body, conn) ⇒ Object
Called by the engine recv loop with each received message.
59 60 61 62 63 |
# File 'lib/nnq/routing/respondent.rb', line 59 def enqueue(body, conn) btrace, payload = parse_backtrace(body) return unless btrace # malformed/over-TTL — drop @recv_queue.enqueue([conn, btrace, payload]) end |
#receive ⇒ String?
Receives the next survey body. Stashes the backtrace + originating connection for the subsequent #send_reply.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/nnq/routing/respondent.rb', line 30 def receive @mutex.synchronize { @pending = nil } item = @recv_queue.dequeue return nil if item.nil? conn, btrace, body = item @mutex.synchronize { @pending = [conn, btrace] } body end |
#send_reply(body) ⇒ Object
Sends body as the reply to the most recently received survey.
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/nnq/routing/respondent.rb', line 45 def send_reply(body) conn, btrace = @mutex.synchronize do raise Error, "RESPONDENT socket has no pending survey to reply to" unless @pending taken = @pending @pending = nil taken end return if conn.closed? conn.(btrace + body) end |