Class: NNQ::Routing::Surveyor

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/routing/surveyor.rb

Overview

SURVEYOR: broadcast side of the survey0 pattern.

Wire format: each survey is prepended with a 4-byte BE survey ID (high bit set — same terminal-marker convention as REQ). Replies carry the same ID back. Stale replies (wrong ID) are dropped.

Send side: fan-out to all connected respondents (like PUB). Each peer gets its own bounded queue and pump.

Recv side: replies are matched by survey ID. Only replies matching the current survey are delivered. After ‘survey_time` elapses, #receive raises TimedOut.

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Surveyor

Returns a new instance of Surveyor.



24
25
26
27
28
29
30
# File 'lib/nnq/routing/surveyor.rb', line 24

def initialize(engine)
  @engine     = engine
  @queues     = {} # conn => Async::LimitedQueue
  @recv_queue = Async::Queue.new
  @current_id = nil
  @mutex      = Mutex.new
end

Instance Method Details

#closeObject



118
119
120
121
# File 'lib/nnq/routing/surveyor.rb', line 118

def close
  @queues.clear
  @recv_queue.enqueue(nil)
end

#close_readObject



124
125
126
# File 'lib/nnq/routing/surveyor.rb', line 124

def close_read
  @recv_queue.enqueue(nil)
end

#connection_added(conn) ⇒ Object



101
102
103
104
105
# File 'lib/nnq/routing/surveyor.rb', line 101

def connection_added(conn)
  queue         = Async::LimitedQueue.new(@engine.options.send_hwm)
  @queues[conn] = queue
  spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



108
109
110
# File 'lib/nnq/routing/surveyor.rb', line 108

def connection_removed(conn)
  @queues.delete(conn)
end

#direct_recv_for(_conn) ⇒ Object

Inproc fast-path hook. Transform filters replies by current survey id and strips the 4-byte header, mirroring #enqueue.



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/nnq/routing/surveyor.rb', line 82

def direct_recv_for(_conn)
  mutex = @mutex
  transform = lambda do |body|
    next nil if body.bytesize < 4
    id      = body.unpack1("N")
    payload = body.byteslice(4..)
    match   = mutex.synchronize { @current_id == id }
    match ? payload : nil
  end
  [@recv_queue, transform]
end

#enqueue(body, _conn) ⇒ Object

Called by the engine recv loop with each received message.



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/nnq/routing/surveyor.rb', line 66

def enqueue(body, _conn)
  return if body.bytesize < 4

  id      = body.unpack1("N")
  payload = body.byteslice(4..)

  @mutex.synchronize do
    return unless @current_id == id
  end

  @recv_queue.enqueue(payload)
end

#preview_body(wire) ⇒ Object

Strips the 4-byte survey id for verbose trace previews.



96
97
98
# File 'lib/nnq/routing/surveyor.rb', line 96

def preview_body(wire)
  wire.byteslice(4..) || wire
end

#receiveString

Receives the next reply within the survey window. Raises TimedOut when the window expires.

Returns:

  • (String)

    reply body



57
58
59
60
61
62
# File 'lib/nnq/routing/surveyor.rb', line 57

def receive
  survey_time = @engine.options.survey_time
  Fiber.scheduler.with_timeout(survey_time) { @recv_queue.dequeue }
rescue Async::TimeoutError
  raise NNQ::TimedOut, "survey timed out"
end

#send_queue_drained?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/nnq/routing/surveyor.rb', line 113

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end

#send_survey(body) ⇒ Object

Broadcasts body as a survey to all connected respondents. Starts a new survey window; any previous survey is abandoned.

Parameters:

  • body (String)


37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/nnq/routing/surveyor.rb', line 37

def send_survey(body)
  id = SecureRandom.random_number(0x80000000) | 0x80000000

  @mutex.synchronize do
    @current_id = id
  end

  header = [id].pack("N")
  wire   = header + body

  @queues.each_value do |q|
    q.enqueue(wire) unless q.limited?
  end
end