Class: NNQ::Routing::Respondent

Inherits:
Object
  • Object
show all
Includes:
Backtrace
Defined in:
lib/nnq/routing/respondent.rb

Overview

RESPONDENT: reply side of the survey0 pattern.

Semantics mirror REP: strict alternation of #receive then #send_reply. The backtrace (survey ID + any hop IDs) is stripped on receive and echoed verbatim on reply.

Constant Summary

Constants included from Backtrace

Backtrace::MAX_HOPS

Instance Method Summary collapse

Methods included from Backtrace

#parse_backtrace

Constructor Details

#initialize(engine) ⇒ Respondent

Returns a new instance of Respondent.



18
19
20
21
22
23
# File 'lib/nnq/routing/respondent.rb', line 18

def initialize(engine)
  @engine     = engine
  @recv_queue = Async::Queue.new
  @pending    = nil
  @mutex      = Mutex.new
end

Instance Method Details

#closeObject



73
74
75
# File 'lib/nnq/routing/respondent.rb', line 73

def close
  @recv_queue.enqueue(nil)
end

#close_readObject



78
79
80
# File 'lib/nnq/routing/respondent.rb', line 78

def close_read
  @recv_queue.enqueue(nil)
end

#connection_removed(conn) ⇒ Object



66
67
68
69
70
# File 'lib/nnq/routing/respondent.rb', line 66

def connection_removed(conn)
  @mutex.synchronize do
    @pending = nil if @pending && @pending[0] == conn
  end
end

#enqueue(body, conn) ⇒ Object

Called by the engine recv loop with each received message.



59
60
61
62
63
# File 'lib/nnq/routing/respondent.rb', line 59

def enqueue(body, conn)
  btrace, payload = parse_backtrace(body)
  return unless btrace # malformed/over-TTL — drop
  @recv_queue.enqueue([conn, btrace, payload])
end

#receiveString?

Receives the next survey body. Stashes the backtrace + originating connection for the subsequent #send_reply.

Returns:

  • (String, nil)

    survey body, or nil if the socket was closed



30
31
32
33
34
35
36
37
38
39
# File 'lib/nnq/routing/respondent.rb', line 30

def receive
  @mutex.synchronize { @pending = nil }
  item = @recv_queue.dequeue

  return nil if item.nil?

  conn, btrace, body = item
  @mutex.synchronize { @pending = [conn, btrace] }
  body
end

#send_reply(body) ⇒ Object

Sends body as the reply to the most recently received survey.

Parameters:

  • body (String)


45
46
47
48
49
50
51
52
53
54
55
# File 'lib/nnq/routing/respondent.rb', line 45

def send_reply(body)
  conn, btrace = @mutex.synchronize do
    raise Error, "RESPONDENT socket has no pending survey to reply to" unless @pending
    taken    = @pending
    @pending = nil
    taken
  end

  return if conn.closed?
  conn.send_message(btrace + body)
end