Class: NNQ::Routing::SurveyorRaw

Inherits:
Object
  • Object
show all
Includes:
Backtrace
Defined in:
lib/nnq/routing/surveyor_raw.rb

Overview

Raw SURVEYOR: fans out surveys to all peers like cooked Surveyor, but without a survey window, survey-id matching, or timeout. Replies are delivered as ‘[pipe, header, body]` tuples so the app can correlate by header verbatim.

Each per-conn send queue holds ‘[header, body]` pairs and the pump calls `conn.write_message(body, header: header)` so the protocol-sp header kwarg is threaded through the fan-out —zero concat even on the broadcast path.

Constant Summary

Constants included from Backtrace

Backtrace::MAX_HOPS

Instance Method Summary collapse

Methods included from Backtrace

#parse_backtrace, too_many_hops?

Constructor Details

#initialize(engine) ⇒ SurveyorRaw

Returns a new instance of SurveyorRaw.



23
24
25
26
27
28
# File 'lib/nnq/routing/surveyor_raw.rb', line 23

def initialize(engine)
  @engine     = engine
  @queues     = {} # conn => Async::LimitedQueue
  @pump_tasks = {} # conn => Async::Task
  @recv_queue = Async::LimitedQueue.new(engine.options.recv_hwm)
end

Instance Method Details

#closeObject



74
75
76
77
78
79
# File 'lib/nnq/routing/surveyor_raw.rb', line 74

def close
  @pump_tasks.each_value(&:stop)
  @pump_tasks.clear
  @queues.clear
  @recv_queue.enqueue(nil)
end

#close_readObject



82
83
84
# File 'lib/nnq/routing/surveyor_raw.rb', line 82

def close_read
  @recv_queue.enqueue(nil)
end

#connection_added(conn) ⇒ Object



50
51
52
53
54
# File 'lib/nnq/routing/surveyor_raw.rb', line 50

def connection_added(conn)
  queue             = Async::LimitedQueue.new(@engine.options.send_hwm)
  @queues[conn]     = queue
  @pump_tasks[conn] = spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



57
58
59
60
61
62
63
64
65
66
# File 'lib/nnq/routing/surveyor_raw.rb', line 57

def connection_removed(conn)
  @queues.delete(conn)
  task = @pump_tasks.delete(conn)

  return unless task
  return if task == Async::Task.current

  task.stop
rescue IOError, Errno::EPIPE
end

#enqueue(wire_bytes, conn) ⇒ Object



43
44
45
46
47
# File 'lib/nnq/routing/surveyor_raw.rb', line 43

def enqueue(wire_bytes, conn)
  header, payload = parse_backtrace(wire_bytes)
  return unless header
  @recv_queue.enqueue([conn, header, payload])
end

#receiveObject



38
39
40
# File 'lib/nnq/routing/surveyor_raw.rb', line 38

def receive
  @recv_queue.dequeue
end

#send(body, header:) ⇒ Object



31
32
33
34
35
# File 'lib/nnq/routing/surveyor_raw.rb', line 31

def send(body, header:)
  @queues.each_value do |q|
    q.enqueue([header, body]) unless q.limited?
  end
end

#send_queue_drained?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/nnq/routing/surveyor_raw.rb', line 69

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end