Class: NNQ::Routing::SurveyorRaw
- Inherits:
-
Object
- Object
- NNQ::Routing::SurveyorRaw
show all
- Includes:
- Backtrace
- Defined in:
- lib/nnq/routing/surveyor_raw.rb
Overview
Raw SURVEYOR: fans out surveys to all peers like cooked Surveyor, but without a survey window, survey-id matching, or timeout. Replies are delivered as ‘[pipe, header, body]` tuples so the app can correlate by header verbatim.
Each per-conn send queue holds ‘[header, body]` pairs and the pump calls `conn.write_message(body, header: header)` so the protocol-sp header kwarg is threaded through the fan-out —zero concat even on the broadcast path.
Constant Summary
Constants included
from Backtrace
Backtrace::MAX_HOPS
Instance Method Summary
collapse
Methods included from Backtrace
#parse_backtrace, too_many_hops?
Constructor Details
Returns a new instance of SurveyorRaw.
23
24
25
26
27
28
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 23
def initialize(engine)
@engine = engine
@queues = {} @pump_tasks = {} @recv_queue = Async::LimitedQueue.new(engine.options.recv_hwm)
end
|
Instance Method Details
#close ⇒ Object
74
75
76
77
78
79
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 74
def close
@pump_tasks.each_value(&:stop)
@pump_tasks.clear
@queues.clear
@recv_queue.enqueue(nil)
end
|
#close_read ⇒ Object
82
83
84
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 82
def close_read
@recv_queue.enqueue(nil)
end
|
#connection_added(conn) ⇒ Object
50
51
52
53
54
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 50
def connection_added(conn)
queue = Async::LimitedQueue.new(@engine.options.send_hwm)
@queues[conn] = queue
@pump_tasks[conn] = spawn_pump(conn, queue)
end
|
#connection_removed(conn) ⇒ Object
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 57
def connection_removed(conn)
@queues.delete(conn)
task = @pump_tasks.delete(conn)
return unless task
return if task == Async::Task.current
task.stop
rescue IOError, Errno::EPIPE
end
|
#enqueue(wire_bytes, conn) ⇒ Object
43
44
45
46
47
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 43
def enqueue(wire_bytes, conn)
, payload = parse_backtrace(wire_bytes)
return unless
@recv_queue.enqueue([conn, , payload])
end
|
#receive ⇒ Object
38
39
40
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 38
def receive
@recv_queue.dequeue
end
|
#send(body, header:) ⇒ Object
31
32
33
34
35
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 31
def send(body, header:)
@queues.each_value do |q|
q.enqueue([, body]) unless q.limited?
end
end
|
#send_queue_drained? ⇒ Boolean
69
70
71
|
# File 'lib/nnq/routing/surveyor_raw.rb', line 69
def send_queue_drained?
@queues.each_value.all? { |q| q.empty? }
end
|