Class: NNQ::Routing::Bus
- Inherits:
-
Object
- Object
- NNQ::Routing::Bus
- Defined in:
- lib/nnq/routing/bus.rb
Overview
BUS0: best-effort bidirectional mesh.
Send side: fan-out to all connected peers. Each peer gets its own bounded send queue and pump fiber — a slow peer drops messages instead of blocking fast ones (same as PUB). Send never blocks.
Recv side: all incoming messages are pushed into a shared unbounded queue (same as PULL).
No SP headers in cooked mode — body on the wire is the user payload.
Instance Method Summary collapse
- #close ⇒ Object
- #close_read ⇒ Object
- #connection_added(conn) ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#enqueue(body, _conn = nil) ⇒ Object
Called by the engine recv loop with each received message.
-
#initialize(engine) ⇒ Bus
constructor
A new instance of Bus.
-
#receive ⇒ String?
Message body, or nil once the socket is closed.
-
#send(body) ⇒ Object
Broadcasts
bodyto every connected peer. - #send_queue_drained? ⇒ Boolean
Constructor Details
#initialize(engine) ⇒ Bus
Returns a new instance of Bus.
22 23 24 25 26 27 |
# File 'lib/nnq/routing/bus.rb', line 22 def initialize(engine) @engine = engine @queues = {} # conn => Async::LimitedQueue @pump_tasks = {} # conn => Async::Task @recv_queue = Async::Queue.new end |
Instance Method Details
#close ⇒ Object
75 76 77 78 79 80 |
# File 'lib/nnq/routing/bus.rb', line 75 def close @pump_tasks.each_value(&:stop) @pump_tasks.clear @queues.clear @recv_queue.enqueue(nil) end |
#close_read ⇒ Object
83 84 85 |
# File 'lib/nnq/routing/bus.rb', line 83 def close_read @recv_queue.enqueue(nil) end |
#connection_added(conn) ⇒ Object
53 54 55 56 57 |
# File 'lib/nnq/routing/bus.rb', line 53 def connection_added(conn) queue = Async::LimitedQueue.new(@engine..send_hwm) @queues[conn] = queue @pump_tasks[conn] = spawn_pump(conn, queue) end |
#connection_removed(conn) ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/nnq/routing/bus.rb', line 60 def connection_removed(conn) @queues.delete(conn) task = @pump_tasks.delete(conn) return unless task return if task == Async::Task.current task.stop rescue IOError, Errno::EPIPE end |
#enqueue(body, _conn = nil) ⇒ Object
Called by the engine recv loop with each received message.
42 43 44 |
# File 'lib/nnq/routing/bus.rb', line 42 def enqueue(body, _conn = nil) @recv_queue.enqueue(body) end |
#receive ⇒ String?
Returns message body, or nil once the socket is closed.
48 49 50 |
# File 'lib/nnq/routing/bus.rb', line 48 def receive @recv_queue.dequeue end |
#send(body) ⇒ Object
Broadcasts body to every connected peer. Non-blocking per peer: drops when a peer’s queue is at HWM.
34 35 36 37 38 |
# File 'lib/nnq/routing/bus.rb', line 34 def send(body) @queues.each_value do |queue| queue.enqueue(body) unless queue.limited? end end |
#send_queue_drained? ⇒ Boolean
70 71 72 |
# File 'lib/nnq/routing/bus.rb', line 70 def send_queue_drained? @queues.each_value.all? { |q| q.empty? } end |