Class: NNQ::Routing::SurveyorRaw

Inherits:
Object
  • Object
show all
Includes:
Backtrace
Defined in:
lib/nnq/routing/surveyor_raw.rb

Overview

Raw SURVEYOR: fans out surveys to all peers like cooked Surveyor, but without a survey window, survey-id matching, or timeout. Replies are delivered as ‘[pipe, header, body]` tuples so the app can correlate by header verbatim.

Each per-conn send queue holds ‘[header, body]` pairs and the pump calls `conn.write_message(body, header: header)` so the protocol-sp header kwarg is threaded through the fan-out —zero concat even on the broadcast path.

Constant Summary

Constants included from Backtrace

Backtrace::MAX_HOPS

Instance Method Summary collapse

Methods included from Backtrace

#parse_backtrace, too_many_hops?

Constructor Details

#initialize(engine) ⇒ SurveyorRaw

Returns a new instance of SurveyorRaw.



23
24
25
26
27
# File 'lib/nnq/routing/surveyor_raw.rb', line 23

def initialize(engine)
  @engine     = engine
  @queues     = {} # conn => Async::LimitedQueue
  @recv_queue = Async::LimitedQueue.new(engine.options.recv_hwm)
end

Instance Method Details

#closeObject



82
83
84
85
# File 'lib/nnq/routing/surveyor_raw.rb', line 82

def close
  @queues.clear
  @recv_queue.enqueue(nil)
end

#close_readObject



88
89
90
# File 'lib/nnq/routing/surveyor_raw.rb', line 88

def close_read
  @recv_queue.enqueue(nil)
end

#connection_added(conn) ⇒ Object



65
66
67
68
69
# File 'lib/nnq/routing/surveyor_raw.rb', line 65

def connection_added(conn)
  queue         = Async::LimitedQueue.new(@engine.options.send_hwm)
  @queues[conn] = queue
  spawn_pump(conn, queue)
end

#connection_removed(conn) ⇒ Object



72
73
74
# File 'lib/nnq/routing/surveyor_raw.rb', line 72

def connection_removed(conn)
  @queues.delete(conn)
end

#direct_recv_for(conn) ⇒ Object

Inproc fast-path hook.



50
51
52
53
54
55
56
# File 'lib/nnq/routing/surveyor_raw.rb', line 50

def direct_recv_for(conn)
  transform = lambda do |wire_bytes|
    header, payload = parse_backtrace(wire_bytes)
    header ? [conn, header, payload] : nil
  end
  [@recv_queue, transform]
end

#enqueue(wire_bytes, conn) ⇒ Object



42
43
44
45
46
# File 'lib/nnq/routing/surveyor_raw.rb', line 42

def enqueue(wire_bytes, conn)
  header, payload = parse_backtrace(wire_bytes)
  return unless header
  @recv_queue.enqueue([conn, header, payload])
end

#preview_body(wire) ⇒ Object



59
60
61
62
# File 'lib/nnq/routing/surveyor_raw.rb', line 59

def preview_body(wire)
  _, payload = parse_backtrace(wire)
  payload || wire
end

#receiveObject



37
38
39
# File 'lib/nnq/routing/surveyor_raw.rb', line 37

def receive
  @recv_queue.dequeue
end

#send(body, header:) ⇒ Object



30
31
32
33
34
# File 'lib/nnq/routing/surveyor_raw.rb', line 30

def send(body, header:)
  @queues.each_value do |q|
    q.enqueue([header, body]) unless q.limited?
  end
end

#send_queue_drained?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/nnq/routing/surveyor_raw.rb', line 77

def send_queue_drained?
  @queues.each_value.all? { |q| q.empty? }
end