Class: NNQ::Routing::Surveyor

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/surveyor.rb

Overview

SURVEYOR: broadcast side of the survey0 pattern.

Wire format: each survey is prepended with a 4-byte BE survey ID (high bit set — same terminal-marker convention as REQ). Replies carry the same ID back. Stale replies (wrong ID) are dropped.

Send side: fan-out to all connected respondents (like PUB). Each peer gets its own bounded queue and pump.

Recv side: replies are matched by survey ID. Only replies matching the current survey are delivered. After ‘survey_time` elapses, #receive raises TimedOut.

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Surveyor

Returns a new instance of Surveyor.



24
25
26
27
28
29
30
31
# File 'lib/nnq/routing/surveyor.rb', line 24

def initialize(engine)
  @engine     = engine
  @queues     = {} # conn => Async::LimitedQueue
  @pump_tasks = {} # conn => Async::Task
  @recv_queue = Async::Queue.new
  @current_id = nil
  @mutex      = Mutex.new
end

Instance Method Details

#closeObject



105
106
107
108
109
110
# File 'lib/nnq/routing/surveyor.rb', line 105

def close
  @pump_tasks.each_value(&:stop)
  @pump_tasks.clear
  @queues.clear
  @recv_queue.enqueue(nil)
end

#close_readObject



113
114
115
# File 'lib/nnq/routing/surveyor.rb', line 113

def close_read
  @recv_queue.enqueue(nil)
end

#connection_added(conn) ⇒ Object



81
82
83
84
85
# File 'lib/nnq/routing/surveyor.rb', line 81

def connection_added(conn)
  queue             = Async::LimitedQueue.new(@engine.options.send_hwm)
  @queues[conn]     = queue
  @pump_tasks[conn] = spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



88
89
90
91
92
93
94
95
96
97
# File 'lib/nnq/routing/surveyor.rb', line 88

def connection_removed(conn)
  @queues.delete(conn)
  task = @pump_tasks.delete(conn)

  return unless task
  return if task == Async::Task.current

  task.stop
rescue IOError, Errno::EPIPE
end

#enqueue(body, _conn) ⇒ Object

Called by the engine recv loop with each received message.



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/nnq/routing/surveyor.rb', line 67

def enqueue(body, _conn)
  return if body.bytesize < 4

  id      = body.unpack1("N")
  payload = body.byteslice(4..)

  @mutex.synchronize do
    return unless @current_id == id
  end

  @recv_queue.enqueue(payload)
end

#receiveString

Receives the next reply within the survey window. Raises TimedOut when the window expires.

Returns:

  • (String)

    reply body



58
59
60
61
62
63
# File 'lib/nnq/routing/surveyor.rb', line 58

def receive
  survey_time = @engine.options.survey_time
  Fiber.scheduler.with_timeout(survey_time) { @recv_queue.dequeue }
rescue Async::TimeoutError
  raise NNQ::TimedOut, "survey timed out"
end

#send_queue_drained?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/nnq/routing/surveyor.rb', line 100

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end

#send_survey(body) ⇒ Object

Broadcasts body as a survey to all connected respondents. Starts a new survey window; any previous survey is abandoned.

Parameters:

  • body (String)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/nnq/routing/surveyor.rb', line 38

def send_survey(body)
  id = SecureRandom.random_number(0x80000000) | 0x80000000

  @mutex.synchronize do
    @current_id = id
  end

  header = [id].pack("N")
  wire   = header + body

  @queues.each_value do |q|
    q.enqueue(wire) unless q.limited?
  end
end